X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-binding-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fbinding%2Fimpl%2Fconnect%2Fdom%2FBindingIndependentConnector.java;h=b27c80d784e39fce232ae227a752f3dd3b77ab99;hp=75b0138e7cd75657adba42182d0dc7e0cf9c270c;hb=c55df06ea1182c1f73d09fe777fbc3c320d5e571;hpb=cbdf09d099f297db7712c3dd4637475c88f92113 diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingIndependentConnector.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingIndependentConnector.java index 75b0138e7c..b27c80d784 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingIndependentConnector.java +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingIndependentConnector.java @@ -1,9 +1,22 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ package org.opendaylight.controller.sal.binding.impl.connect.dom; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + import java.lang.ref.WeakReference; +import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; +import java.lang.reflect.Proxy; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; @@ -15,29 +28,38 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import org.opendaylight.controller.md.sal.binding.impl.AbstractForwardedDataBroker; import org.opendaylight.controller.md.sal.common.api.RegistrationListener; import org.opendaylight.controller.md.sal.common.api.TransactionStatus; import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler; import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction; import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration; import org.opendaylight.controller.md.sal.common.api.data.DataModification; -import org.opendaylight.controller.md.sal.common.api.data.DataReader; import org.opendaylight.controller.md.sal.common.api.routing.RouteChange; import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener; import org.opendaylight.controller.md.sal.common.api.routing.RouteChangePublisher; +import org.opendaylight.controller.sal.binding.api.NotificationProviderService; +import org.opendaylight.controller.sal.binding.api.NotificationProviderService.NotificationInterestListener; import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; import org.opendaylight.controller.sal.binding.api.data.DataProviderService; import org.opendaylight.controller.sal.binding.api.data.RuntimeDataProvider; +import org.opendaylight.controller.sal.binding.api.rpc.RpcContextIdentifier; import org.opendaylight.controller.sal.binding.api.rpc.RpcRouter; +import org.opendaylight.controller.sal.binding.impl.DataBrokerImpl; +import org.opendaylight.controller.sal.binding.impl.MountPointManagerImpl.BindingMountPointImpl; import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl; -import org.opendaylight.controller.sal.binding.api.rpc.RpcContextIdentifier; +import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.GlobalRpcRegistrationListener; +import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.RouterInstantiationListener; import org.opendaylight.controller.sal.common.util.CommitHandlerTransactions; import org.opendaylight.controller.sal.common.util.Rpcs; -import org.opendaylight.controller.sal.core.api.Provider; import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; +import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration; +import org.opendaylight.controller.sal.core.api.Provider; import org.opendaylight.controller.sal.core.api.RpcImplementation; import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry; import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction; +import org.opendaylight.controller.sal.core.api.notify.NotificationListener; +import org.opendaylight.controller.sal.core.api.notify.NotificationPublishService; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.concepts.util.ClassLoaderUtils; @@ -48,21 +70,28 @@ import org.opendaylight.yangtools.yang.binding.BindingMapping; import org.opendaylight.yangtools.yang.binding.DataContainer; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.binding.Notification; import org.opendaylight.yangtools.yang.binding.RpcService; import org.opendaylight.yangtools.yang.binding.util.BindingReflections; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.opendaylight.yangtools.yang.data.api.Node; +import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode; +import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService; +import org.opendaylight.yangtools.yang.data.impl.codec.DeserializationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; - -import static com.google.common.base.Preconditions.*; +import com.google.common.collect.ImmutableSet.Builder; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; public class BindingIndependentConnector implements // RuntimeDataProvider, // @@ -71,22 +100,22 @@ public class BindingIndependentConnector implements // private final Logger LOG = LoggerFactory.getLogger(BindingIndependentConnector.class); - private static final InstanceIdentifier ROOT = InstanceIdentifier.builder().toInstance(); - private static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier ROOT_BI = org.opendaylight.yangtools.yang.data.api.InstanceIdentifier .builder().toInstance(); + private final static Method EQUALS_METHOD; + private BindingIndependentMappingService mappingService; private org.opendaylight.controller.sal.core.api.data.DataProviderService biDataService; private DataProviderService baDataService; - private ConcurrentMap domOpenedTransactions = new ConcurrentHashMap<>(); - private ConcurrentMap bindingOpenedTransactions = new ConcurrentHashMap<>(); + private final ConcurrentMap domOpenedTransactions = new ConcurrentHashMap<>(); + private final ConcurrentMap bindingOpenedTransactions = new ConcurrentHashMap<>(); - private BindingToDomCommitHandler bindingToDomCommitHandler = new BindingToDomCommitHandler(); - private DomToBindingCommitHandler domToBindingCommitHandler = new DomToBindingCommitHandler(); + private final BindingToDomCommitHandler bindingToDomCommitHandler = new BindingToDomCommitHandler(); + private final DomToBindingCommitHandler domToBindingCommitHandler = new DomToBindingCommitHandler(); private Registration, DataObject>> baCommitHandlerRegistration; @@ -99,35 +128,49 @@ public class BindingIndependentConnector implements // // private ListenerRegistration // bindingToDomRpcManager; - private Function, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier> toDOMInstanceIdentifier = new Function, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier>() { + private final Function, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier> toDOMInstanceIdentifier = new Function, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier>() { @Override - public org.opendaylight.yangtools.yang.data.api.InstanceIdentifier apply(InstanceIdentifier input) { + public org.opendaylight.yangtools.yang.data.api.InstanceIdentifier apply(final InstanceIdentifier input) { return mappingService.toDataDom(input); } }; - private Registration, DataObject>> baDataReaderRegistration; - private boolean rpcForwarding = false; private boolean dataForwarding = false; private boolean notificationForwarding = false; + private RpcProviderRegistryImpl baRpcRegistryImpl; + + private NotificationProviderService baNotifyService; + + private NotificationPublishService domNotificationService; + + static { + try { + EQUALS_METHOD = Object.class.getMethod("equals", Object.class); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + @Override - public DataObject readOperationalData(InstanceIdentifier path) { + public DataObject readOperationalData(final InstanceIdentifier path) { try { org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath = mappingService.toDataDom(path); CompositeNode result = biDataService.readOperationalData(biPath); - return potentialAugmentationRead(path,biPath,result); + return potentialAugmentationRead(path, biPath, result); } catch (DeserializationException e) { throw new IllegalStateException(e); } } - private DataObject potentialAugmentationRead(InstanceIdentifier path, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath, CompositeNode result) throws DeserializationException { + private DataObject potentialAugmentationRead(InstanceIdentifier path, + final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath, final CompositeNode result) + throws DeserializationException { Class targetType = path.getTargetType(); if (Augmentation.class.isAssignableFrom(targetType)) { path = mappingService.fromDataDom(biPath); @@ -141,46 +184,70 @@ public class BindingIndependentConnector implements // } @Override - public DataObject readConfigurationData(InstanceIdentifier path) { + public DataObject readConfigurationData(final InstanceIdentifier path) { try { org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath = mappingService.toDataDom(path); CompositeNode result = biDataService.readConfigurationData(biPath); - return potentialAugmentationRead(path,biPath,result); + return potentialAugmentationRead(path, biPath, result); } catch (DeserializationException e) { throw new IllegalStateException(e); } } private DataModificationTransaction createBindingToDomTransaction( - DataModification, DataObject> source) { + final DataModification, DataObject> source) { DataModificationTransaction target = biDataService.beginTransaction(); + LOG.debug("Created DOM Transaction {} for {},", target.getIdentifier(), source.getIdentifier()); + for (InstanceIdentifier entry : source.getRemovedConfigurationData()) { + org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biEntry = mappingService.toDataDom(entry); + target.removeConfigurationData(biEntry); + LOG.debug("Delete of Binding Configuration Data {} is translated to {}", entry, biEntry); + } + for (InstanceIdentifier entry : source.getRemovedOperationalData()) { + org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biEntry = mappingService.toDataDom(entry); + target.removeOperationalData(biEntry); + LOG.debug("Delete of Binding Operational Data {} is translated to {}", entry, biEntry); + } for (Entry, DataObject> entry : source.getUpdatedConfigurationData() .entrySet()) { Entry biEntry = mappingService .toDataDom(entry); target.putConfigurationData(biEntry.getKey(), biEntry.getValue()); + LOG.debug("Update of Binding Configuration Data {} is translated to {}", entry, biEntry); } for (Entry, DataObject> entry : source.getUpdatedOperationalData() .entrySet()) { Entry biEntry = mappingService .toDataDom(entry); target.putOperationalData(biEntry.getKey(), biEntry.getValue()); + LOG.debug("Update of Binding Operational Data {} is translated to {}", entry, biEntry); } - for (InstanceIdentifier entry : source.getRemovedConfigurationData()) { - org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biEntry = mappingService.toDataDom(entry); - target.removeConfigurationData(biEntry); - } - for (InstanceIdentifier entry : source.getRemovedOperationalData()) { - org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biEntry = mappingService.toDataDom(entry); - target.removeOperationalData(biEntry); - } + return target; } private org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction createDomToBindingTransaction( - DataModification source) { + final DataModification source) { org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction target = baDataService .beginTransaction(); + for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier entry : source.getRemovedConfigurationData()) { + try { + + InstanceIdentifier baEntry = mappingService.fromDataDom(entry); + target.removeConfigurationData(baEntry); + } catch (DeserializationException e) { + LOG.error("Ommiting from BA transaction: {}.", entry, e); + } + } + for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier entry : source.getRemovedOperationalData()) { + try { + + InstanceIdentifier baEntry = mappingService.fromDataDom(entry); + target.removeOperationalData(baEntry); + } catch (DeserializationException e) { + LOG.error("Ommiting from BA transaction: {}.", entry, e); + } + } for (Entry entry : source .getUpdatedConfigurationData().entrySet()) { try { @@ -202,24 +269,6 @@ public class BindingIndependentConnector implements // LOG.error("Ommiting from BA transaction: {}.", entry.getKey(), e); } } - for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier entry : source.getRemovedConfigurationData()) { - try { - - InstanceIdentifier baEntry = mappingService.fromDataDom(entry); - target.removeConfigurationData(baEntry); - } catch (DeserializationException e) { - LOG.error("Ommiting from BA transaction: {}.", entry, e); - } - } - for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier entry : source.getRemovedOperationalData()) { - try { - - InstanceIdentifier baEntry = mappingService.fromDataDom(entry); - target.removeOperationalData(baEntry); - } catch (DeserializationException e) { - LOG.error("Ommiting from BA transaction: {}.", entry, e); - } - } return target; } @@ -227,7 +276,8 @@ public class BindingIndependentConnector implements // return biDataService; } - protected void setDomDataService(org.opendaylight.controller.sal.core.api.data.DataProviderService biDataService) { + protected void setDomDataService( + final org.opendaylight.controller.sal.core.api.data.DataProviderService biDataService) { this.biDataService = biDataService; } @@ -235,7 +285,7 @@ public class BindingIndependentConnector implements // return baDataService; } - protected void setBindingDataService(DataProviderService baDataService) { + protected void setBindingDataService(final DataProviderService baDataService) { this.baDataService = baDataService; } @@ -243,33 +293,58 @@ public class BindingIndependentConnector implements // return baRpcRegistry; } - protected void setBindingRpcRegistry(RpcProviderRegistry rpcRegistry) { + protected void setBindingRpcRegistry(final RpcProviderRegistry rpcRegistry) { this.baRpcRegistry = rpcRegistry; } public void startDataForwarding() { - checkState(!dataForwarding, "Connector is already forwarding data."); - baDataReaderRegistration = baDataService.registerDataReader(ROOT, this); - baCommitHandlerRegistration = baDataService.registerCommitHandler(ROOT, bindingToDomCommitHandler); - biCommitHandlerRegistration = biDataService.registerCommitHandler(ROOT_BI, domToBindingCommitHandler); - baDataService.registerCommitHandlerListener(domToBindingCommitHandler); + if (baDataService instanceof AbstractForwardedDataBroker) { + dataForwarding = true; + return; + } + + final DataProviderService baData; + if (baDataService instanceof BindingMountPointImpl) { + baData = ((BindingMountPointImpl) baDataService).getDataBrokerImpl(); + LOG.debug("Extracted BA Data provider {} from mount point {}", baData, baDataService); + } else { + baData = baDataService; + } + + if (baData instanceof DataBrokerImpl) { + checkState(!dataForwarding, "Connector is already forwarding data."); + ((DataBrokerImpl) baData).setDataReadDelegate(this); + ((DataBrokerImpl) baData).setRootCommitHandler(bindingToDomCommitHandler); + biCommitHandlerRegistration = biDataService.registerCommitHandler(ROOT_BI, domToBindingCommitHandler); + baDataService.registerCommitHandlerListener(domToBindingCommitHandler); + } + dataForwarding = true; } - + public void startRpcForwarding() { - if (baRpcRegistry != null && biRpcRegistry != null && baRpcRegistry instanceof RouteChangePublisher) { - checkState(!rpcForwarding,"Connector is already forwarding RPCs"); + if (biRpcRegistry != null && baRpcRegistry instanceof RouteChangePublisher) { + checkState(!rpcForwarding, "Connector is already forwarding RPCs"); domToBindingRpcManager = baRpcRegistry.registerRouteChangeListener(new DomToBindingRpcForwardingManager()); + if (baRpcRegistry instanceof RpcProviderRegistryImpl) { + baRpcRegistryImpl = (RpcProviderRegistryImpl) baRpcRegistry; + baRpcRegistryImpl.registerRouterInstantiationListener(domToBindingRpcManager.getInstance()); + baRpcRegistryImpl.registerGlobalRpcRegistrationListener(domToBindingRpcManager.getInstance()); + } rpcForwarding = true; } } - + public void startNotificationForwarding() { checkState(!notificationForwarding, "Connector is already forwarding notifications."); - notificationForwarding = true; + if (baNotifyService != null && domNotificationService != null) { + baNotifyService.registerInterestListener(new DomToBindingNotificationForwarder()); + + notificationForwarding = true; + } } - protected void setMappingService(BindingIndependentMappingService mappingService) { + protected void setMappingService(final BindingIndependentMappingService mappingService) { this.mappingService = mappingService; } @@ -279,17 +354,17 @@ public class BindingIndependentConnector implements // } @Override - public void onSessionInitiated(ProviderSession session) { + public void onSessionInitiated(final ProviderSession session) { setDomDataService(session.getService(org.opendaylight.controller.sal.core.api.data.DataProviderService.class)); setDomRpcRegistry(session.getService(RpcProvisionRegistry.class)); - + } - public void onRpcRouterCreated(Class serviceType, RpcRouter router) { + public void onRpcRouterCreated(final Class serviceType, final RpcRouter router) { } - public void setDomRpcRegistry(RpcProvisionRegistry registry) { + public void setDomRpcRegistry(final RpcProvisionRegistry registry) { biRpcRegistry = registry; } @@ -311,8 +386,8 @@ public class BindingIndependentConnector implements // private final DataModification modification; public DomToBindingTransaction( - org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction backing, - DataModification modification) { + final org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction backing, + final DataModification modification) { super(); this.backing = backing; this.modification = modification; @@ -347,11 +422,11 @@ public class BindingIndependentConnector implements // private class BindingToDomTransaction implements DataCommitTransaction, DataObject> { - private DataModificationTransaction backing; - private DataModification, DataObject> modification; + private final DataModificationTransaction backing; + private final DataModification, DataObject> modification; - public BindingToDomTransaction(DataModificationTransaction backing, - DataModification, DataObject> modification) { + public BindingToDomTransaction(final DataModificationTransaction backing, + final DataModification, DataObject> modification) { this.backing = backing; this.modification = modification; domOpenedTransactions.put(backing.getIdentifier(), this); @@ -389,7 +464,7 @@ public class BindingIndependentConnector implements // @Override public org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction, DataObject> requestCommit( - DataModification, DataObject> bindingTransaction) { + final DataModification, DataObject> bindingTransaction) { /** * Transaction was created as DOM transaction, in that case we do @@ -401,18 +476,19 @@ public class BindingIndependentConnector implements // } DataModificationTransaction domTransaction = createBindingToDomTransaction(bindingTransaction); BindingToDomTransaction wrapped = new BindingToDomTransaction(domTransaction, bindingTransaction); - LOG.info("Forwarding Binding Transaction: {} as DOM Transaction: {} .", bindingTransaction.getIdentifier(), - domTransaction.getIdentifier()); + LOG.trace("Forwarding Binding Transaction: {} as DOM Transaction: {} .", + bindingTransaction.getIdentifier(), domTransaction.getIdentifier()); return wrapped; } } private class DomToBindingCommitHandler implements // - RegistrationListener, DataObject>>, // + RegistrationListener, DataObject>>, // DataCommitHandler { @Override - public void onRegister(DataCommitHandlerRegistration, DataObject> registration) { + public void onRegister( + final DataCommitHandlerRegistration, DataObject> registration) { org.opendaylight.yangtools.yang.data.api.InstanceIdentifier domPath = mappingService.toDataDom(registration .getPath()); @@ -420,13 +496,15 @@ public class BindingIndependentConnector implements // } @Override - public void onUnregister(DataCommitHandlerRegistration, DataObject> registration) { + public void onUnregister( + final DataCommitHandlerRegistration, DataObject> registration) { // NOOP for now // FIXME: do registration based on only active commit handlers. } + @Override public org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction requestCommit( - DataModification domTransaction) { + final DataModification domTransaction) { Object identifier = domTransaction.getIdentifier(); /** @@ -441,25 +519,56 @@ public class BindingIndependentConnector implements // org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction baTransaction = createDomToBindingTransaction(domTransaction); DomToBindingTransaction forwardedTransaction = new DomToBindingTransaction(baTransaction, domTransaction); - LOG.info("Forwarding DOM Transaction: {} as Binding Transaction: {}.", domTransaction.getIdentifier(), + LOG.trace("Forwarding DOM Transaction: {} as Binding Transaction: {}.", domTransaction.getIdentifier(), baTransaction.getIdentifier()); return forwardedTransaction; } } + /** + * Manager responsible for instantiating forwarders responsible for + * forwarding of RPC invocations from DOM Broker to Binding Aware Broker + * + */ private class DomToBindingRpcForwardingManager implements - RouteChangeListener> { + RouteChangeListener>, RouterInstantiationListener, + GlobalRpcRegistrationListener { private final Map, DomToBindingRpcForwarder> forwarders = new WeakHashMap<>(); + private RpcProviderRegistryImpl registryImpl; + + public RpcProviderRegistryImpl getRegistryImpl() { + return registryImpl; + } + + public void setRegistryImpl(final RpcProviderRegistryImpl registryImpl) { + this.registryImpl = registryImpl; + } + + @Override + public void onGlobalRpcRegistered(final Class cls) { + getRpcForwarder(cls, null); + } + + @Override + public void onGlobalRpcUnregistered(final Class cls) { + // NOOP + } + + @Override + public void onRpcRouterCreated(final RpcRouter router) { + Class ctx = router.getContexts().iterator().next(); + getRpcForwarder(router.getServiceType(), ctx); + } @Override - public void onRouteChange(RouteChange> change) { + public void onRouteChange(final RouteChange> change) { for (Entry>> entry : change.getAnnouncements().entrySet()) { bindingRoutesAdded(entry); } } - private void bindingRoutesAdded(Entry>> entry) { + private void bindingRoutesAdded(final Entry>> entry) { Class context = entry.getKey().getRoutingContext(); Class service = entry.getKey().getRpcService(); if (context != null) { @@ -467,8 +576,8 @@ public class BindingIndependentConnector implements // } } - private DomToBindingRpcForwarder getRpcForwarder(Class service, - Class context) { + private DomToBindingRpcForwarder getRpcForwarder(final Class service, + final Class context) { DomToBindingRpcForwarder potential = forwarders.get(service); if (potential != null) { return potential; @@ -478,39 +587,66 @@ public class BindingIndependentConnector implements // } else { potential = new DomToBindingRpcForwarder(service, context); } + forwarders.put(service, potential); return potential; } } - private class DomToBindingRpcForwarder implements RpcImplementation { + private class DomToBindingRpcForwarder implements RpcImplementation, InvocationHandler { private final Set supportedRpcs; private final WeakReference> rpcServiceType; - private Set registrations; + private final Set registrations; + private final Map strategiesByQName = new HashMap<>(); + private final WeakHashMap strategiesByMethod = new WeakHashMap<>(); - public DomToBindingRpcForwarder(Class service) { + public DomToBindingRpcForwarder(final Class service) { this.rpcServiceType = new WeakReference>(service); this.supportedRpcs = mappingService.getRpcQNamesFor(service); - for (QName rpc : supportedRpcs) { - biRpcRegistry.addRpcImplementation(rpc, this); + try { + for (QName rpc : supportedRpcs) { + RpcInvocationStrategy strategy = createInvocationStrategy(rpc, service); + strategiesByMethod.put(strategy.targetMethod, strategy); + strategiesByQName.put(rpc, strategy); + biRpcRegistry.addRpcImplementation(rpc, this); + } + + } catch (Exception e) { + LOG.error("Could not forward Rpcs of type {}", service.getName(), e); } registrations = ImmutableSet.of(); } - public DomToBindingRpcForwarder(Class service, Class context) { + /** + * Constructor for Routed RPC Forwareder. + * + * @param service + * @param context + */ + public DomToBindingRpcForwarder(final Class service, + final Class context) { this.rpcServiceType = new WeakReference>(service); this.supportedRpcs = mappingService.getRpcQNamesFor(service); - registrations = new HashSet<>(); - for (QName rpc : supportedRpcs) { - registrations.add(biRpcRegistry.addRoutedRpcImplementation(rpc, this)); + Builder registrationsBuilder = ImmutableSet + . builder(); + try { + for (QName rpc : supportedRpcs) { + RpcInvocationStrategy strategy = createInvocationStrategy(rpc, service); + strategiesByMethod.put(strategy.targetMethod, strategy); + strategiesByQName.put(rpc, strategy); + registrationsBuilder.add(biRpcRegistry.addRoutedRpcImplementation(rpc, this)); + } + createDefaultDomForwarder(); + } catch (Exception e) { + LOG.error("Could not forward Rpcs of type {}", service.getName(), e); } - registrations = ImmutableSet.copyOf(registrations); + registrations = registrationsBuilder.build(); } - public void registerPaths(Class context, Class service, - Set> set) { + public void registerPaths(final Class context, + final Class service, final Set> set) { QName ctx = BindingReflections.findQName(context); for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier path : FluentIterable.from(set).transform( toDOMInstanceIdentifier)) { @@ -520,8 +656,23 @@ public class BindingIndependentConnector implements // } } - public void removePaths(Class context, Class service, - Set> set) { + @Override + public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { + if (EQUALS_METHOD.equals(method)) { + return false; + } + RpcInvocationStrategy strategy = strategiesByMethod.get(method); + checkState(strategy != null); + checkArgument(args.length <= 2); + if (args.length == 1) { + checkArgument(args[0] instanceof DataObject); + return strategy.forwardToDomBroker((DataObject) args[0]); + } + return strategy.forwardToDomBroker(null); + } + + public void removePaths(final Class context, final Class service, + final Set> set) { QName ctx = BindingReflections.findQName(context); for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier path : FluentIterable.from(set).transform( toDOMInstanceIdentifier)) { @@ -536,8 +687,20 @@ public class BindingIndependentConnector implements // return supportedRpcs; } + @SuppressWarnings({ "unchecked", "rawtypes" }) + public void createDefaultDomForwarder() { + if (baRpcRegistryImpl != null) { + Class cls = rpcServiceType.get(); + ClassLoader clsLoader = cls.getClassLoader(); + RpcService proxy = (RpcService) Proxy.newProxyInstance(clsLoader, new Class[] { cls }, this); + + RpcRouter rpcRouter = baRpcRegistryImpl.getRpcRouter(rpcServiceType.get()); + rpcRouter.registerDefaultService(proxy); + } + } + @Override - public RpcResult invokeRpc(QName rpc, CompositeNode domInput) { + public ListenableFuture> invokeRpc(final QName rpc, final CompositeNode domInput) { checkArgument(rpc != null); checkArgument(domInput != null); @@ -546,14 +709,19 @@ public class BindingIndependentConnector implements // RpcService rpcService = baRpcRegistry.getRpcService(rpcType); checkState(rpcService != null); CompositeNode domUnwrappedInput = domInput.getFirstCompositeByName(QName.create(rpc, "input")); + try { - return resolveInvocationStrategy(rpc, rpcType).invokeOn(rpcService, domUnwrappedInput); + return Futures.immediateFuture(resolveInvocationStrategy(rpc).invokeOn(rpcService, domUnwrappedInput)); } catch (Exception e) { - throw new IllegalStateException(e); + return Futures.immediateFailedFuture(e); } } - private RpcInvocationStrategy resolveInvocationStrategy(final QName rpc, + private RpcInvocationStrategy resolveInvocationStrategy(final QName rpc) { + return strategiesByQName.get(rpc); + } + + private RpcInvocationStrategy createInvocationStrategy(final QName rpc, final Class rpcType) throws Exception { return ClassLoaderUtils.withClassLoader(rpcType.getClassLoader(), new Callable() { @Override @@ -575,12 +743,15 @@ public class BindingIndependentConnector implements // RpcInvocationStrategy strategy = null; if (outputClass.isPresent()) { if (inputClass.isPresent()) { - strategy = new DefaultInvocationStrategy(targetMethod, outputClass.get(), inputClass.get()); + strategy = new DefaultInvocationStrategy(rpc, targetMethod, outputClass.get(), inputClass + .get()); } else { - strategy = new NoInputNoOutputInvocationStrategy(targetMethod); + strategy = new NoInputInvocationStrategy(rpc, targetMethod, outputClass.get()); } + } else if (inputClass.isPresent()) { + strategy = new NoOutputInvocationStrategy(rpc, targetMethod, inputClass.get()); } else { - strategy = null; + strategy = new NoInputNoOutputInvocationStrategy(rpc, targetMethod); } return strategy; } @@ -592,15 +763,20 @@ public class BindingIndependentConnector implements // private abstract class RpcInvocationStrategy { protected final Method targetMethod; + protected final QName rpc; - public RpcInvocationStrategy(Method targetMethod) { + public RpcInvocationStrategy(final QName rpc, final Method targetMethod) { this.targetMethod = targetMethod; + this.rpc = rpc; } + public abstract Future> forwardToDomBroker(DataObject input); + public abstract RpcResult uncheckedInvoke(RpcService rpcService, CompositeNode domInput) throws Exception; - public RpcResult invokeOn(RpcService rpcService, CompositeNode domInput) throws Exception { + public RpcResult invokeOn(final RpcService rpcService, final CompositeNode domInput) + throws Exception { return uncheckedInvoke(rpcService, domInput); } } @@ -608,44 +784,174 @@ public class BindingIndependentConnector implements // private class DefaultInvocationStrategy extends RpcInvocationStrategy { @SuppressWarnings("rawtypes") - private WeakReference inputClass; + private final WeakReference inputClass; @SuppressWarnings("rawtypes") - private WeakReference outputClass; + private final WeakReference outputClass; @SuppressWarnings({ "rawtypes", "unchecked" }) - public DefaultInvocationStrategy(Method targetMethod, Class outputClass, - Class inputClass) { - super(targetMethod); + public DefaultInvocationStrategy(final QName rpc, final Method targetMethod, final Class outputClass, + final Class inputClass) { + super(rpc, targetMethod); this.outputClass = new WeakReference(outputClass); this.inputClass = new WeakReference(inputClass); } + @SuppressWarnings("unchecked") @Override - public RpcResult uncheckedInvoke(RpcService rpcService, CompositeNode domInput) throws Exception { + public RpcResult uncheckedInvoke(final RpcService rpcService, final CompositeNode domInput) + throws Exception { DataContainer bindingInput = mappingService.dataObjectFromDataDom(inputClass.get(), domInput); - Future> result = (Future>) targetMethod.invoke(rpcService, bindingInput); - if (result == null) { + Future> futureResult = (Future>) targetMethod.invoke(rpcService, bindingInput); + if (futureResult == null) { return Rpcs.getRpcResult(false); } - RpcResult bindingResult = result.get(); + RpcResult bindingResult = futureResult.get(); + final Object resultObj = bindingResult.getResult(); + if (resultObj instanceof DataObject) { + final CompositeNode output = mappingService.toDataDom((DataObject) resultObj); + return Rpcs.getRpcResult(true, output, Collections. emptySet()); + } + return Rpcs.getRpcResult(true); + } + + @Override + public ListenableFuture> forwardToDomBroker(final DataObject input) { + if (biRpcRegistry == null) { + return Futures.> immediateFuture(Rpcs.getRpcResult(false)); + } + + CompositeNode xml = mappingService.toDataDom(input); + CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc, ImmutableList.> of(xml)); + + return Futures.transform(biRpcRegistry.invokeRpc(rpc, wrappedXml), + new Function, RpcResult>() { + @Override + public RpcResult apply(final RpcResult input) { + Object baResultValue = null; + if (input.getResult() != null) { + baResultValue = mappingService.dataObjectFromDataDom(outputClass.get(), + input.getResult()); + } + return Rpcs.getRpcResult(input.isSuccessful(), baResultValue, input.getErrors()); + } + }); + } + } + + private class NoInputInvocationStrategy extends RpcInvocationStrategy { + + @SuppressWarnings("rawtypes") + private final WeakReference outputClass; + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public NoInputInvocationStrategy(final QName rpc, final Method targetMethod, final Class outputClass) { + super(rpc, targetMethod); + this.outputClass = new WeakReference(outputClass); + } + + @SuppressWarnings("unchecked") + @Override + public RpcResult uncheckedInvoke(final RpcService rpcService, final CompositeNode domInput) + throws Exception { + Future> futureResult = (Future>) targetMethod.invoke(rpcService); + if (futureResult == null) { + return Rpcs.getRpcResult(false); + } + RpcResult bindingResult = futureResult.get(); + final Object resultObj = bindingResult.getResult(); + if (resultObj instanceof DataObject) { + final CompositeNode output = mappingService.toDataDom((DataObject) resultObj); + return Rpcs.getRpcResult(true, output, Collections. emptySet()); + } return Rpcs.getRpcResult(true); } + @Override + public Future> forwardToDomBroker(final DataObject input) { + if (biRpcRegistry != null) { + CompositeNode xml = mappingService.toDataDom(input); + CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc, ImmutableList.> of(xml)); + return Futures.transform(biRpcRegistry.invokeRpc(rpc, wrappedXml), + new Function, RpcResult>() { + @Override + public RpcResult apply(final RpcResult input) { + Object baResultValue = null; + if (input.getResult() != null) { + baResultValue = mappingService.dataObjectFromDataDom(outputClass.get(), + input.getResult()); + } + return Rpcs.getRpcResult(input.isSuccessful(), baResultValue, input.getErrors()); + } + }); + } else { + return Futures.> immediateFuture(Rpcs.getRpcResult(false)); + } + } } private class NoInputNoOutputInvocationStrategy extends RpcInvocationStrategy { - public NoInputNoOutputInvocationStrategy(Method targetMethod) { - super(targetMethod); + public NoInputNoOutputInvocationStrategy(final QName rpc, final Method targetMethod) { + super(rpc, targetMethod); } - public RpcResult uncheckedInvoke(RpcService rpcService, CompositeNode domInput) throws Exception { + @Override + public RpcResult uncheckedInvoke(final RpcService rpcService, final CompositeNode domInput) + throws Exception { @SuppressWarnings("unchecked") Future> result = (Future>) targetMethod.invoke(rpcService); RpcResult bindingResult = result.get(); return Rpcs.getRpcResult(bindingResult.isSuccessful(), bindingResult.getErrors()); } + + @Override + public Future> forwardToDomBroker(final DataObject input) { + return Futures.immediateFuture(null); + } + } + + private class NoOutputInvocationStrategy extends RpcInvocationStrategy { + + @SuppressWarnings("rawtypes") + private final WeakReference inputClass; + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public NoOutputInvocationStrategy(final QName rpc, final Method targetMethod, + final Class inputClass) { + super(rpc, targetMethod); + this.inputClass = new WeakReference(inputClass); + } + + @Override + public RpcResult uncheckedInvoke(final RpcService rpcService, final CompositeNode domInput) + throws Exception { + DataContainer bindingInput = mappingService.dataObjectFromDataDom(inputClass.get(), domInput); + Future> result = (Future>) targetMethod.invoke(rpcService, bindingInput); + if (result == null) { + return Rpcs.getRpcResult(false); + } + RpcResult bindingResult = result.get(); + return Rpcs.getRpcResult(true); + } + + @Override + public ListenableFuture> forwardToDomBroker(final DataObject input) { + if (biRpcRegistry == null) { + return Futures.> immediateFuture(Rpcs.getRpcResult(false)); + } + + CompositeNode xml = mappingService.toDataDom(input); + CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc, ImmutableList.> of(xml)); + + return Futures.transform(biRpcRegistry.invokeRpc(rpc, wrappedXml), + new Function, RpcResult>() { + @Override + public RpcResult apply(final RpcResult input) { + return Rpcs. getRpcResult(input.isSuccessful(), null, input.getErrors()); + } + }); + } } public boolean isRpcForwarding() { @@ -657,11 +963,60 @@ public class BindingIndependentConnector implements // } public boolean isNotificationForwarding() { - // TODO Auto-generated method stub return notificationForwarding; } public BindingIndependentMappingService getMappingService() { return mappingService; } + + public void setBindingNotificationService(final NotificationProviderService baService) { + this.baNotifyService = baService; + + } + + public void setDomNotificationService(final NotificationPublishService domService) { + this.domNotificationService = domService; + } + + private class DomToBindingNotificationForwarder implements NotificationInterestListener, NotificationListener { + + private final ConcurrentMap>> notifications = new ConcurrentHashMap<>(); + private final Set supportedNotifications = new HashSet<>(); + + @Override + public Set getSupportedNotifications() { + return Collections.unmodifiableSet(supportedNotifications); + } + + @Override + public void onNotification(final CompositeNode notification) { + QName qname = notification.getNodeType(); + WeakReference> potential = notifications.get(qname); + if (potential != null) { + Class potentialClass = potential.get(); + if (potentialClass != null) { + final DataContainer baNotification = mappingService.dataObjectFromDataDom(potentialClass, + notification); + + if (baNotification instanceof Notification) { + baNotifyService.publish((Notification) baNotification); + } + } + } + } + + @Override + public void onNotificationSubscribtion(final Class notificationType) { + QName qname = BindingReflections.findQName(notificationType); + if (qname != null) { + WeakReference> already = notifications.putIfAbsent(qname, + new WeakReference>(notificationType)); + if (already == null) { + domNotificationService.addNotificationListener(qname, this); + supportedNotifications.add(qname); + } + } + } + } }