From 7e1d2b4f9e138e9e31b7fa60d69c9d13bafc9728 Mon Sep 17 00:00:00 2001 From: Lukas Sedlak Date: Thu, 10 Jul 2014 10:16:46 +0200 Subject: [PATCH] Bug 1303: BindingIndependentConnector splitted. Inner private classes in BindingIndependentConnector extracted into standalone package protected classes. Fixed bug in BindingToDomTransaction class - domOpenedTransactions management. Change-Id: I69173db2bf4e6746eac4fa67c57f65095f45c603 Signed-off-by: Lukas Sedlak --- .../dom/BindingIndependentConnector.java | 693 +----------------- .../dom/BindingToDomCommitHandler.java | 99 +++ .../connect/dom/BindingToDomTransaction.java | 59 ++ .../dom/DomToBindingCommitHandler.java | 135 ++++ .../DomToBindingNotificationForwarder.java | 70 ++ .../connect/dom/DomToBindingRpcForwarder.java | 279 +++++++ .../dom/DomToBindingRpcForwardingManager.java | 113 +++ .../connect/dom/DomToBindingTransaction.java | 60 ++ 8 files changed, 851 insertions(+), 657 deletions(-) create mode 100644 opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingToDomCommitHandler.java create mode 100644 opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingToDomTransaction.java create mode 100644 opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/DomToBindingCommitHandler.java create mode 100644 opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/DomToBindingNotificationForwarder.java create mode 100644 opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/DomToBindingRpcForwarder.java create mode 100644 opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/DomToBindingRpcForwardingManager.java create mode 100644 opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/DomToBindingTransaction.java diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingIndependentConnector.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingIndependentConnector.java index 6e4b2d8d99..71253d02d6 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingIndependentConnector.java +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingIndependentConnector.java @@ -7,142 +7,67 @@ */ package org.opendaylight.controller.sal.binding.impl.connect.dom; -import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import java.lang.ref.WeakReference; -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.Method; -import java.lang.reflect.Proxy; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.WeakHashMap; -import java.util.concurrent.Callable; + import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - import org.opendaylight.controller.md.sal.binding.impl.AbstractForwardedDataBroker; -import org.opendaylight.controller.md.sal.common.api.RegistrationListener; -import org.opendaylight.controller.md.sal.common.api.TransactionStatus; import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler; -import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction; -import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration; -import org.opendaylight.controller.md.sal.common.api.data.DataModification; -import org.opendaylight.controller.md.sal.common.api.routing.RouteChange; -import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener; import org.opendaylight.controller.md.sal.common.api.routing.RouteChangePublisher; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; -import org.opendaylight.controller.sal.binding.api.NotificationProviderService.NotificationInterestListener; import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; import org.opendaylight.controller.sal.binding.api.data.DataProviderService; import org.opendaylight.controller.sal.binding.api.data.RuntimeDataProvider; -import org.opendaylight.controller.sal.binding.api.rpc.RpcContextIdentifier; -import org.opendaylight.controller.sal.binding.api.rpc.RpcRouter; import org.opendaylight.controller.sal.binding.impl.DataBrokerImpl; import org.opendaylight.controller.sal.binding.impl.MountPointManagerImpl.BindingMountPointImpl; import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl; -import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.GlobalRpcRegistrationListener; -import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.RouterInstantiationListener; -import org.opendaylight.controller.sal.common.util.CommitHandlerTransactions; -import org.opendaylight.controller.sal.common.util.Rpcs; import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; -import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration; import org.opendaylight.controller.sal.core.api.Provider; -import org.opendaylight.controller.sal.core.api.RpcImplementation; import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry; -import org.opendaylight.controller.sal.core.api.RpcRegistrationListener; -import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction; -import org.opendaylight.controller.sal.core.api.notify.NotificationListener; import org.opendaylight.controller.sal.core.api.notify.NotificationPublishService; -import org.opendaylight.yangtools.concepts.CompositeObjectRegistration; -import org.opendaylight.yangtools.concepts.CompositeObjectRegistration.CompositeObjectRegistrationBuilder; import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.concepts.ObjectRegistration; import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.yang.binding.Augmentable; import org.opendaylight.yangtools.yang.binding.Augmentation; -import org.opendaylight.yangtools.yang.binding.BaseIdentity; -import org.opendaylight.yangtools.yang.binding.BindingMapping; -import org.opendaylight.yangtools.yang.binding.DataContainer; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -import org.opendaylight.yangtools.yang.binding.Notification; -import org.opendaylight.yangtools.yang.binding.RpcService; -import org.opendaylight.yangtools.yang.binding.util.BindingReflections; -import org.opendaylight.yangtools.yang.binding.util.ClassLoaderUtils; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.common.RpcError; -import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.CompositeNode; import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService; import org.opendaylight.yangtools.yang.data.impl.codec.DeserializationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Function; -import com.google.common.base.Optional; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.ImmutableSet.Builder; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; - public class BindingIndependentConnector implements // RuntimeDataProvider, // Provider, // AutoCloseable { - private final Logger LOG = LoggerFactory.getLogger(BindingIndependentConnector.class); - + private static final Logger LOG = LoggerFactory.getLogger(BindingIndependentConnector.class); private static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier ROOT_BI = org.opendaylight.yangtools.yang.data.api.InstanceIdentifier .builder().toInstance(); - private final static Method EQUALS_METHOD; - private BindingIndependentMappingService mappingService; - private org.opendaylight.controller.sal.core.api.data.DataProviderService biDataService; - private DataProviderService baDataService; - private final ConcurrentMap domOpenedTransactions = new ConcurrentHashMap<>(); - private final ConcurrentMap bindingOpenedTransactions = new ConcurrentHashMap<>(); - - private final BindingToDomCommitHandler bindingToDomCommitHandler = new BindingToDomCommitHandler(); - private final DomToBindingCommitHandler domToBindingCommitHandler = new DomToBindingCommitHandler(); - - private Registration, DataObject>> baCommitHandlerRegistration; + private final ConcurrentMap domOpenedTransactions; + private final ConcurrentMap bindingOpenedTransactions; + private final BindingToDomCommitHandler bindingToDomCommitHandler; + private final DomToBindingCommitHandler domToBindingCommitHandler; private Registration> biCommitHandlerRegistration; - private RpcProvisionRegistry biRpcRegistry; private RpcProviderRegistry baRpcRegistry; private ListenerRegistration domToBindingRpcManager; - // private ListenerRegistration - // bindingToDomRpcManager; - - private final Function, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier> toDOMInstanceIdentifier = new Function, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier>() { - @Override - public org.opendaylight.yangtools.yang.data.api.InstanceIdentifier apply(final InstanceIdentifier input) { - return mappingService.toDataDom(input); - } - - }; - - private boolean rpcForwarding = false; - - private boolean dataForwarding = false; - - private boolean notificationForwarding = false; + private boolean rpcForwarding; + private boolean dataForwarding; + private boolean notificationForwarding; private RpcProviderRegistryImpl baRpcRegistryImpl; @@ -150,12 +75,15 @@ public class BindingIndependentConnector implements // private NotificationPublishService domNotificationService; - static { - try { - EQUALS_METHOD = Object.class.getMethod("equals", Object.class); - } catch (Exception e) { - throw new RuntimeException(e); - } + public BindingIndependentConnector() { + domOpenedTransactions = new ConcurrentHashMap<>(); + bindingOpenedTransactions = new ConcurrentHashMap<>(); + + bindingToDomCommitHandler = new BindingToDomCommitHandler(bindingOpenedTransactions, domOpenedTransactions); + domToBindingCommitHandler = new DomToBindingCommitHandler(bindingOpenedTransactions, domOpenedTransactions); + rpcForwarding = false; + dataForwarding = false; + notificationForwarding = false; } @Override @@ -195,84 +123,6 @@ public class BindingIndependentConnector implements // } } - private DataModificationTransaction createBindingToDomTransaction( - final DataModification, DataObject> source) { - DataModificationTransaction target = biDataService.beginTransaction(); - LOG.debug("Created DOM Transaction {} for {},", target.getIdentifier(), source.getIdentifier()); - for (InstanceIdentifier entry : source.getRemovedConfigurationData()) { - org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biEntry = mappingService.toDataDom(entry); - target.removeConfigurationData(biEntry); - LOG.debug("Delete of Binding Configuration Data {} is translated to {}", entry, biEntry); - } - for (InstanceIdentifier entry : source.getRemovedOperationalData()) { - org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biEntry = mappingService.toDataDom(entry); - target.removeOperationalData(biEntry); - LOG.debug("Delete of Binding Operational Data {} is translated to {}", entry, biEntry); - } - for (Entry, DataObject> entry : source.getUpdatedConfigurationData() - .entrySet()) { - Entry biEntry = mappingService - .toDataDom(entry); - target.putConfigurationData(biEntry.getKey(), biEntry.getValue()); - LOG.debug("Update of Binding Configuration Data {} is translated to {}", entry, biEntry); - } - for (Entry, DataObject> entry : source.getUpdatedOperationalData() - .entrySet()) { - Entry biEntry = mappingService - .toDataDom(entry); - target.putOperationalData(biEntry.getKey(), biEntry.getValue()); - LOG.debug("Update of Binding Operational Data {} is translated to {}", entry, biEntry); - } - - return target; - } - - private org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction createDomToBindingTransaction( - final DataModification source) { - org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction target = baDataService - .beginTransaction(); - for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier entry : source.getRemovedConfigurationData()) { - try { - - InstanceIdentifier baEntry = mappingService.fromDataDom(entry); - target.removeConfigurationData(baEntry); - } catch (DeserializationException e) { - LOG.error("Ommiting from BA transaction: {}.", entry, e); - } - } - for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier entry : source.getRemovedOperationalData()) { - try { - - InstanceIdentifier baEntry = mappingService.fromDataDom(entry); - target.removeOperationalData(baEntry); - } catch (DeserializationException e) { - LOG.error("Ommiting from BA transaction: {}.", entry, e); - } - } - for (Entry entry : source - .getUpdatedConfigurationData().entrySet()) { - try { - InstanceIdentifier baKey = mappingService.fromDataDom(entry.getKey()); - DataObject baData = mappingService.dataObjectFromDataDom(baKey, entry.getValue()); - target.putConfigurationData(baKey, baData); - } catch (DeserializationException e) { - LOG.error("Ommiting from BA transaction: {}.", entry.getKey(), e); - } - } - for (Entry entry : source - .getUpdatedOperationalData().entrySet()) { - try { - - InstanceIdentifier baKey = mappingService.fromDataDom(entry.getKey()); - DataObject baData = mappingService.dataObjectFromDataDom(baKey, entry.getValue()); - target.putOperationalData(baKey, baData); - } catch (DeserializationException e) { - LOG.error("Ommiting from BA transaction: {}.", entry.getKey(), e); - } - } - return target; - } - public org.opendaylight.controller.sal.core.api.data.DataProviderService getBiDataService() { return biDataService; } @@ -280,6 +130,7 @@ public class BindingIndependentConnector implements // protected void setDomDataService( final org.opendaylight.controller.sal.core.api.data.DataProviderService biDataService) { this.biDataService = biDataService; + bindingToDomCommitHandler.setBindingIndependentDataService(this.biDataService); } public DataProviderService getBaDataService() { @@ -288,6 +139,7 @@ public class BindingIndependentConnector implements // protected void setBindingDataService(final DataProviderService baDataService) { this.baDataService = baDataService; + domToBindingCommitHandler.setBindingAwareDataService(this.baDataService); } public RpcProviderRegistry getRpcRegistry() { @@ -323,10 +175,12 @@ public class BindingIndependentConnector implements // dataForwarding = true; } + //WTF? - cycle references to biFwdManager - need to solve :-/ public void startRpcForwarding() { + checkNotNull(mappingService, "Unable to start Rpc forwarding. Reason: Mapping Service is not initialized properly!"); if (biRpcRegistry != null && baRpcRegistry instanceof RouteChangePublisher) { checkState(!rpcForwarding, "Connector is already forwarding RPCs"); - final DomToBindingRpcForwardingManager biFwdManager = new DomToBindingRpcForwardingManager(); + final DomToBindingRpcForwardingManager biFwdManager = new DomToBindingRpcForwardingManager(mappingService, biRpcRegistry, baRpcRegistry); domToBindingRpcManager = baRpcRegistry.registerRouteChangeListener(biFwdManager); biRpcRegistry.addRpcRegistrationListener(biFwdManager); @@ -334,6 +188,7 @@ public class BindingIndependentConnector implements // baRpcRegistryImpl = (RpcProviderRegistryImpl) baRpcRegistry; baRpcRegistryImpl.registerRouterInstantiationListener(domToBindingRpcManager.getInstance()); baRpcRegistryImpl.registerGlobalRpcRegistrationListener(domToBindingRpcManager.getInstance()); + biFwdManager.setRegistryImpl(baRpcRegistryImpl); } rpcForwarding = true; } @@ -341,15 +196,23 @@ public class BindingIndependentConnector implements // public void startNotificationForwarding() { checkState(!notificationForwarding, "Connector is already forwarding notifications."); - if (baNotifyService != null && domNotificationService != null) { - baNotifyService.registerInterestListener(new DomToBindingNotificationForwarder()); - + if (mappingService == null) { + LOG.warn("Unable to start Notification forwarding. Reason: Mapping Service is not initialized properly!"); + } else if (baNotifyService == null) { + LOG.warn("Unable to start Notification forwarding. Reason: Binding Aware Notify Service is not initialized properly!"); + } else if (domNotificationService == null) { + LOG.warn("Unable to start Notification forwarding. Reason: DOM Notification Service is not initialized properly!"); + } else { + baNotifyService.registerInterestListener( + new DomToBindingNotificationForwarder(mappingService, baNotifyService, domNotificationService)); notificationForwarding = true; } } protected void setMappingService(final BindingIndependentMappingService mappingService) { this.mappingService = mappingService; + bindingToDomCommitHandler.setMappingService(this.mappingService); + domToBindingCommitHandler.setMappingService(this.mappingService); } @Override @@ -364,458 +227,15 @@ public class BindingIndependentConnector implements // } - public void onRpcRouterCreated(final Class serviceType, final RpcRouter router) { - - } - public void setDomRpcRegistry(final RpcProvisionRegistry registry) { biRpcRegistry = registry; } @Override public void close() throws Exception { - if (baCommitHandlerRegistration != null) { - baCommitHandlerRegistration.close(); - } if (biCommitHandlerRegistration != null) { biCommitHandlerRegistration.close(); } - - } - - private class DomToBindingTransaction implements - DataCommitTransaction { - - private final org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction backing; - private final DataModification modification; - - public DomToBindingTransaction( - final org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction backing, - final DataModification modification) { - super(); - this.backing = backing; - this.modification = modification; - bindingOpenedTransactions.put(backing.getIdentifier(), this); - } - - @Override - public DataModification getModification() { - return modification; - } - - @Override - public RpcResult rollback() throws IllegalStateException { - // backing.cancel(); - return Rpcs. getRpcResult(true, null, Collections. emptySet()); - } - - @Override - public RpcResult finish() throws IllegalStateException { - Future> result = backing.commit(); - try { - RpcResult baResult = result.get(); - return Rpcs. getRpcResult(baResult.isSuccessful(), null, baResult.getErrors()); - } catch (InterruptedException e) { - throw new IllegalStateException("", e); - } catch (ExecutionException e) { - throw new IllegalStateException("", e); - } - } - } - - private class BindingToDomTransaction implements - DataCommitTransaction, DataObject> { - - private final DataModificationTransaction backing; - private final DataModification, DataObject> modification; - - public BindingToDomTransaction(final DataModificationTransaction backing, - final DataModification, DataObject> modification) { - this.backing = backing; - this.modification = modification; - domOpenedTransactions.put(backing.getIdentifier(), this); - } - - @Override - public DataModification, DataObject> getModification() { - return modification; - } - - @Override - public RpcResult finish() throws IllegalStateException { - Future> result = backing.commit(); - try { - RpcResult biResult = result.get(); - return Rpcs. getRpcResult(biResult.isSuccessful(), null, biResult.getErrors()); - } catch (InterruptedException e) { - throw new IllegalStateException("", e); - } catch (ExecutionException e) { - throw new IllegalStateException("", e); - } finally { - domOpenedTransactions.remove(backing.getIdentifier()); - } - } - - @Override - public RpcResult rollback() throws IllegalStateException { - domOpenedTransactions.remove(backing.getIdentifier()); - return Rpcs. getRpcResult(true, null, Collections. emptySet()); - } - } - - private class BindingToDomCommitHandler implements - DataCommitHandler, DataObject> { - - @Override - public org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction, DataObject> requestCommit( - final DataModification, DataObject> bindingTransaction) { - - /** - * Transaction was created as DOM transaction, in that case we do - * not need to forward it back. - */ - if (bindingOpenedTransactions.containsKey(bindingTransaction.getIdentifier())) { - - return CommitHandlerTransactions.allwaysSuccessfulTransaction(bindingTransaction); - } - DataModificationTransaction domTransaction = createBindingToDomTransaction(bindingTransaction); - BindingToDomTransaction wrapped = new BindingToDomTransaction(domTransaction, bindingTransaction); - LOG.trace("Forwarding Binding Transaction: {} as DOM Transaction: {} .", - bindingTransaction.getIdentifier(), domTransaction.getIdentifier()); - return wrapped; - } - } - - private class DomToBindingCommitHandler implements // - RegistrationListener, DataObject>>, // - DataCommitHandler { - - @Override - public void onRegister( - final DataCommitHandlerRegistration, DataObject> registration) { - - mappingService.toDataDom(registration - .getPath()); - - } - - @Override - public void onUnregister( - final DataCommitHandlerRegistration, DataObject> registration) { - // NOOP for now - // FIXME: do registration based on only active commit handlers. - } - - @Override - public org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction requestCommit( - final DataModification domTransaction) { - Object identifier = domTransaction.getIdentifier(); - - /** - * We checks if the transcation was originated in this mapper. If it - * was originated in this mapper we are returing allways success - * commit hanlder to prevent creating loop in two-phase commit and - * duplicating data. - */ - if (domOpenedTransactions.containsKey(identifier)) { - return CommitHandlerTransactions.allwaysSuccessfulTransaction(domTransaction); - } - - org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction baTransaction = createDomToBindingTransaction(domTransaction); - DomToBindingTransaction forwardedTransaction = new DomToBindingTransaction(baTransaction, domTransaction); - LOG.trace("Forwarding DOM Transaction: {} as Binding Transaction: {}.", domTransaction.getIdentifier(), - baTransaction.getIdentifier()); - return forwardedTransaction; - } - } - - /** - * Manager responsible for instantiating forwarders responsible for - * forwarding of RPC invocations from DOM Broker to Binding Aware Broker - * - */ - private class DomToBindingRpcForwardingManager implements - RouteChangeListener>, RouterInstantiationListener, - GlobalRpcRegistrationListener, RpcRegistrationListener { - - private final Map, DomToBindingRpcForwarder> forwarders = new WeakHashMap<>(); - private RpcProviderRegistryImpl registryImpl; - - public RpcProviderRegistryImpl getRegistryImpl() { - return registryImpl; - } - - public void setRegistryImpl(final RpcProviderRegistryImpl registryImpl) { - this.registryImpl = registryImpl; - } - - @Override - public void onGlobalRpcRegistered(final Class cls) { - getRpcForwarder(cls, null).registerToDOMBroker(); - } - - @Override - public void onGlobalRpcUnregistered(final Class cls) { - // NOOP - } - - @Override - public void onRpcRouterCreated(final RpcRouter router) { - Class ctx = router.getContexts().iterator().next(); - getRpcForwarder(router.getServiceType(), ctx); - } - - @Override - public void onRouteChange(final RouteChange> change) { - for (Entry>> entry : change.getAnnouncements().entrySet()) { - bindingRoutesAdded(entry); - } - } - - private void bindingRoutesAdded(final Entry>> entry) { - Class context = entry.getKey().getRoutingContext(); - Class service = entry.getKey().getRpcService(); - if (context != null) { - getRpcForwarder(service, context).registerPaths(context, service, entry.getValue()); - } - } - - private DomToBindingRpcForwarder getRpcForwarder(final Class service, - final Class context) { - DomToBindingRpcForwarder potential = forwarders.get(service); - if (potential != null) { - return potential; - } - if (context == null) { - potential = new DomToBindingRpcForwarder(service); - } else { - potential = new DomToBindingRpcForwarder(service, context); - } - - forwarders.put(service, potential); - return potential; - } - - @Override - public void onRpcImplementationAdded(final QName name) { - - final Optional> rpcInterface = mappingService.getRpcServiceClassFor( - name.getNamespace().toString(), name.getFormattedRevision()); - if (rpcInterface.isPresent()) { - getRpcForwarder(rpcInterface.get(), null).registerToBindingBroker(); - } - } - - @Override - public void onRpcImplementationRemoved(final QName name) { - - } - } - - private class DomToBindingRpcForwarder implements RpcImplementation, InvocationHandler { - - private final Set supportedRpcs; - private final WeakReference> rpcServiceType; - private Set registrations; - private final Map strategiesByQName = new HashMap<>(); - private final WeakHashMap strategiesByMethod = new WeakHashMap<>(); - private final RpcService proxy; - private ObjectRegistration forwarderRegistration; - private boolean registrationInProgress = false; - - public DomToBindingRpcForwarder(final Class service) { - this.rpcServiceType = new WeakReference>(service); - this.supportedRpcs = mappingService.getRpcQNamesFor(service); - - Class cls = rpcServiceType.get(); - ClassLoader clsLoader = cls.getClassLoader(); - proxy =(RpcService) Proxy.newProxyInstance(clsLoader, new Class[] { cls }, this); - createStrategies(); - } - - /** - * Constructor for Routed RPC Forwareder. - * - * @param service - * @param context - */ - public DomToBindingRpcForwarder(final Class service, - final Class context) { - this(service); - Builder registrationsBuilder = ImmutableSet - . builder(); - try { - for (QName rpc : supportedRpcs) { - registrationsBuilder.add(biRpcRegistry.addRoutedRpcImplementation(rpc, this)); - } - createDefaultDomForwarder(); - } catch (Exception e) { - LOG.error("Could not forward Rpcs of type {}", service.getName(), e); - } - registrations = registrationsBuilder.build(); - } - - - - private void createStrategies() { - try { - for (QName rpc : supportedRpcs) { - RpcInvocationStrategy strategy = createInvocationStrategy(rpc, rpcServiceType.get()); - strategiesByMethod.put(strategy.targetMethod, strategy); - strategiesByQName.put(rpc, strategy); - } - } catch (Exception e) { - LOG.error("Could not forward Rpcs of type {}", rpcServiceType.get(), e); - } - - } - - /** - * Registers RPC Forwarder to DOM Broker, - * this means Binding Aware Broker has implementation of RPC - * which is registered to it. - * - * If RPC Forwarder was previously registered to DOM Broker - * or to Bidning Broker this method is noop to prevent - * creating forwarding loop. - * - */ - public void registerToDOMBroker() { - if(!registrationInProgress && forwarderRegistration == null) { - registrationInProgress = true; - CompositeObjectRegistrationBuilder builder = CompositeObjectRegistration.builderFor(this); - try { - for (QName rpc : supportedRpcs) { - builder.add(biRpcRegistry.addRpcImplementation(rpc, this)); - } - } catch (Exception e) { - LOG.error("Could not forward Rpcs of type {}", rpcServiceType.get(), e); - } - this.forwarderRegistration = builder.toInstance(); - registrationInProgress = false; - } - } - - - public void registerPaths(final Class context, - final Class service, final Set> set) { - QName ctx = BindingReflections.findQName(context); - for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier path : FluentIterable.from(set).transform( - toDOMInstanceIdentifier)) { - for (org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration reg : registrations) { - reg.registerPath(ctx, path); - } - } - } - - @Override - public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { - if (EQUALS_METHOD.equals(method)) { - return false; - } - RpcInvocationStrategy strategy = strategiesByMethod.get(method); - checkState(strategy != null); - checkArgument(args.length <= 2); - if (args.length == 1) { - checkArgument(args[0] instanceof DataObject); - return strategy.forwardToDomBroker((DataObject) args[0]); - } - return strategy.forwardToDomBroker(null); - } - - public void removePaths(final Class context, final Class service, - final Set> set) { - QName ctx = BindingReflections.findQName(context); - for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier path : FluentIterable.from(set).transform( - toDOMInstanceIdentifier)) { - for (org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration reg : registrations) { - reg.unregisterPath(ctx, path); - } - } - } - - @Override - public Set getSupportedRpcs() { - return supportedRpcs; - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - public void createDefaultDomForwarder() { - if (baRpcRegistryImpl != null) { - Class cls = rpcServiceType.get(); - ClassLoader clsLoader = cls.getClassLoader(); - RpcService proxy = (RpcService) Proxy.newProxyInstance(clsLoader, new Class[] { cls }, this); - - RpcRouter rpcRouter = baRpcRegistryImpl.getRpcRouter(rpcServiceType.get()); - rpcRouter.registerDefaultService(proxy); - } - } - - @Override - public ListenableFuture> invokeRpc(final QName rpc, final CompositeNode domInput) { - checkArgument(rpc != null); - checkArgument(domInput != null); - - Class rpcType = rpcServiceType.get(); - checkState(rpcType != null); - RpcService rpcService = baRpcRegistry.getRpcService(rpcType); - checkState(rpcService != null); - CompositeNode domUnwrappedInput = domInput.getFirstCompositeByName(QName.create(rpc, "input")); - - try { - return Futures.immediateFuture(resolveInvocationStrategy(rpc).invokeOn(rpcService, domUnwrappedInput)); - } catch (Exception e) { - return Futures.immediateFailedFuture(e); - } - } - - private RpcInvocationStrategy resolveInvocationStrategy(final QName rpc) { - return strategiesByQName.get(rpc); - } - - private RpcInvocationStrategy createInvocationStrategy(final QName rpc, - final Class rpcType) throws Exception { - return ClassLoaderUtils.withClassLoader(rpcType.getClassLoader(), new Callable() { - @Override - public RpcInvocationStrategy call() throws Exception { - String methodName = BindingMapping.getMethodName(rpc); - Method targetMethod = null; - for (Method possibleMethod : rpcType.getMethods()) { - if (possibleMethod.getName().equals(methodName) - && BindingReflections.isRpcMethod(possibleMethod)) { - targetMethod = possibleMethod; - break; - } - } - checkState(targetMethod != null, "Rpc method not found"); - return new RpcInvocationStrategy(rpc,targetMethod, mappingService, biRpcRegistry); - } - - }); - } - - /** - * Registers RPC Forwarder to Binding Broker, - * this means DOM Broekr has implementation of RPC - * which is registered to it. - * - * If RPC Forwarder was previously registered to DOM Broker - * or to Bidning Broker this method is noop to prevent - * creating forwarding loop. - * - */ - public void registerToBindingBroker() { - if(!registrationInProgress && forwarderRegistration == null) { - try { - registrationInProgress = true; - this.forwarderRegistration = baRpcRegistry.addRpcImplementation((Class)rpcServiceType.get(), proxy); - } catch (Exception e) { - LOG.error("Unable to forward RPCs for {}",rpcServiceType.get(),e); - } finally { - registrationInProgress = false; - } - } - } } public boolean isRpcForwarding() { @@ -842,45 +262,4 @@ public class BindingIndependentConnector implements // public void setDomNotificationService(final NotificationPublishService domService) { this.domNotificationService = domService; } - - private class DomToBindingNotificationForwarder implements NotificationInterestListener, NotificationListener { - - private final ConcurrentMap>> notifications = new ConcurrentHashMap<>(); - private final Set supportedNotifications = new HashSet<>(); - - @Override - public Set getSupportedNotifications() { - return Collections.unmodifiableSet(supportedNotifications); - } - - @Override - public void onNotification(final CompositeNode notification) { - QName qname = notification.getNodeType(); - WeakReference> potential = notifications.get(qname); - if (potential != null) { - Class potentialClass = potential.get(); - if (potentialClass != null) { - final DataContainer baNotification = mappingService.dataObjectFromDataDom(potentialClass, - notification); - - if (baNotification instanceof Notification) { - baNotifyService.publish((Notification) baNotification); - } - } - } - } - - @Override - public void onNotificationSubscribtion(final Class notificationType) { - QName qname = BindingReflections.findQName(notificationType); - if (qname != null) { - WeakReference> already = notifications.putIfAbsent(qname, - new WeakReference>(notificationType)); - if (already == null) { - domNotificationService.addNotificationListener(qname, this); - supportedNotifications.add(qname); - } - } - } - } } diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingToDomCommitHandler.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingToDomCommitHandler.java new file mode 100644 index 0000000000..ee0628308e --- /dev/null +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingToDomCommitHandler.java @@ -0,0 +1,99 @@ +package org.opendaylight.controller.sal.binding.impl.connect.dom; + +import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler; +import org.opendaylight.controller.md.sal.common.api.data.DataModification; +import org.opendaylight.controller.sal.common.util.CommitHandlerTransactions; +import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction; +import org.opendaylight.controller.sal.core.api.data.DataProviderService; +import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class BindingToDomCommitHandler implements + DataCommitHandler, DataObject> { + + private final Logger LOG = LoggerFactory.getLogger(BindingToDomCommitHandler.class); + + private final ConcurrentMap bindingOpenedTransactions; + private final ConcurrentMap domOpenedTransactions; + private org.opendaylight.controller.sal.core.api.data.DataProviderService biDataService; + private BindingIndependentMappingService mappingService; + + BindingToDomCommitHandler(final ConcurrentMap bindingOpenedTransactions, + final ConcurrentMap domOpenedTransactions) { + this.bindingOpenedTransactions = bindingOpenedTransactions; + this.domOpenedTransactions = domOpenedTransactions; + } + + public void setBindingIndependentDataService(final DataProviderService biDataService) { + this.biDataService = biDataService; + } + + public void setMappingService(BindingIndependentMappingService mappingService) { + this.mappingService = mappingService; + } + + @Override + public org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction, DataObject> requestCommit( + final DataModification, DataObject> bindingTransaction) { + + /** + * Transaction was created as DOM transaction, in that case we do + * not need to forward it back. + */ + if (bindingOpenedTransactions.containsKey(bindingTransaction.getIdentifier())) { + return CommitHandlerTransactions.allwaysSuccessfulTransaction(bindingTransaction); + } + DataModificationTransaction domTransaction = createBindingToDomTransaction(bindingTransaction); + BindingToDomTransaction wrapped = new BindingToDomTransaction(domTransaction, bindingTransaction, domOpenedTransactions); + LOG.trace("Forwarding Binding Transaction: {} as DOM Transaction: {} .", + bindingTransaction.getIdentifier(), domTransaction.getIdentifier()); + return wrapped; + } + + private DataModificationTransaction createBindingToDomTransaction( + final DataModification, DataObject> source) { + if (biDataService == null) { + final String msg = "Binding Independent Service is not initialized correctly! Binding to DOM Transaction cannot be created for "; + LOG.error(msg + "{}", source); + throw new IllegalStateException(msg + source); + } + if (mappingService == null) { + final String msg = "Mapping Service is not initialized correctly! Binding to DOM Transaction cannot be created for "; + LOG.error(msg + "{}", source); + throw new IllegalStateException(msg + source); + } + DataModificationTransaction target = biDataService.beginTransaction(); + LOG.debug("Created DOM Transaction {} for {},", target.getIdentifier(), source.getIdentifier()); + for (InstanceIdentifier entry : source.getRemovedConfigurationData()) { + org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biEntry = mappingService.toDataDom(entry); + target.removeConfigurationData(biEntry); + LOG.debug("Delete of Binding Configuration Data {} is translated to {}", entry, biEntry); + } + for (InstanceIdentifier entry : source.getRemovedOperationalData()) { + org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biEntry = mappingService.toDataDom(entry); + target.removeOperationalData(biEntry); + LOG.debug("Delete of Binding Operational Data {} is translated to {}", entry, biEntry); + } + for (Map.Entry, DataObject> entry : source.getUpdatedConfigurationData() + .entrySet()) { + Map.Entry biEntry = mappingService + .toDataDom(entry); + target.putConfigurationData(biEntry.getKey(), biEntry.getValue()); + LOG.debug("Update of Binding Configuration Data {} is translated to {}", entry, biEntry); + } + for (Map.Entry, DataObject> entry : source.getUpdatedOperationalData() + .entrySet()) { + Map.Entry biEntry = mappingService + .toDataDom(entry); + target.putOperationalData(biEntry.getKey(), biEntry.getValue()); + LOG.debug("Update of Binding Operational Data {} is translated to {}", entry, biEntry); + } + return target; + } +} diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingToDomTransaction.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingToDomTransaction.java new file mode 100644 index 0000000000..20a786e774 --- /dev/null +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingToDomTransaction.java @@ -0,0 +1,59 @@ +package org.opendaylight.controller.sal.binding.impl.connect.dom; + +import java.util.Collections; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import org.opendaylight.controller.md.sal.common.api.TransactionStatus; +import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler; +import org.opendaylight.controller.md.sal.common.api.data.DataModification; +import org.opendaylight.controller.sal.common.util.Rpcs; +import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction; +import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.RpcResult; + +class BindingToDomTransaction implements + DataCommitHandler.DataCommitTransaction, DataObject> { + + private final DataModificationTransaction backing; + private final DataModification, DataObject> modification; + private final ConcurrentMap domOpenedTransactions; + + public BindingToDomTransaction(final DataModificationTransaction backing, + final DataModification, DataObject> modification, + ConcurrentMap domOpenedTransactions) { + this.backing = backing; + this.modification = modification; + this.domOpenedTransactions = domOpenedTransactions; + this.domOpenedTransactions.put(backing.getIdentifier(), this); + } + + @Override + public DataModification, DataObject> getModification() { + return modification; + } + + @Override + public RpcResult finish() throws IllegalStateException { + Future> result = backing.commit(); + try { + RpcResult biResult = result.get(); + domOpenedTransactions.remove(backing.getIdentifier()); + return Rpcs.getRpcResult(biResult.isSuccessful(), null, biResult.getErrors()); + } catch (InterruptedException e) { + throw new IllegalStateException("", e); + } catch (ExecutionException e) { + throw new IllegalStateException("", e); + } finally { + domOpenedTransactions.remove(backing.getIdentifier()); + } + } + + @Override + public RpcResult rollback() throws IllegalStateException { + domOpenedTransactions.remove(backing.getIdentifier()); + return Rpcs.getRpcResult(true, null, Collections. emptySet()); + } +} diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/DomToBindingCommitHandler.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/DomToBindingCommitHandler.java new file mode 100644 index 0000000000..395af8f487 --- /dev/null +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/DomToBindingCommitHandler.java @@ -0,0 +1,135 @@ +package org.opendaylight.controller.sal.binding.impl.connect.dom; + +import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import org.opendaylight.controller.md.sal.common.api.RegistrationListener; +import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler; +import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration; +import org.opendaylight.controller.md.sal.common.api.data.DataModification; +import org.opendaylight.controller.sal.binding.api.data.DataProviderService; +import org.opendaylight.controller.sal.common.util.CommitHandlerTransactions; +import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService; +import org.opendaylight.yangtools.yang.data.impl.codec.DeserializationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class DomToBindingCommitHandler implements // + RegistrationListener, DataObject>>, // + DataCommitHandler { + + private final Logger LOG = LoggerFactory.getLogger(DomToBindingCommitHandler.class); + + private final ConcurrentMap bindingOpenedTransactions; + private final ConcurrentMap domOpenedTransactions; + + DomToBindingCommitHandler(final ConcurrentMap bindingOpenedTransactions, + final ConcurrentMap domOpenedTransactions) { + this.bindingOpenedTransactions = bindingOpenedTransactions; + this.domOpenedTransactions = domOpenedTransactions; + } + + private DataProviderService baDataService; + private BindingIndependentMappingService mappingService; + + public void setBindingAwareDataService(DataProviderService baDataService) { + this.baDataService = baDataService; + } + + public void setMappingService(BindingIndependentMappingService mappingService) { + this.mappingService = mappingService; + } + + @Override + public void onRegister(final DataCommitHandlerRegistration, DataObject> registration) { + mappingService.toDataDom(registration.getPath()); + } + + @Override + public void onUnregister( + final DataCommitHandlerRegistration, DataObject> registration) { + // NOOP for now + // FIXME: do registration based on only active commit handlers. + } + + @Override + public org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction requestCommit( + final DataModification domTransaction) { + Object identifier = domTransaction.getIdentifier(); + + /** + * We checks if the transcation was originated in this mapper. If it + * was originated in this mapper we are returing allways success + * commit hanlder to prevent creating loop in two-phase commit and + * duplicating data. + */ + if (domOpenedTransactions.containsKey(identifier)) { + return CommitHandlerTransactions.allwaysSuccessfulTransaction(domTransaction); + } + + org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction baTransaction = createDomToBindingTransaction(domTransaction); + DomToBindingTransaction forwardedTransaction = new DomToBindingTransaction(baTransaction, domTransaction, bindingOpenedTransactions); + LOG.trace("Forwarding DOM Transaction: {} as Binding Transaction: {}.", domTransaction.getIdentifier(), + baTransaction.getIdentifier()); + return forwardedTransaction; + } + + private org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction createDomToBindingTransaction( + final DataModification source) { + if (baDataService == null) { + final String msg = "Binding Aware Service is not initialized correctly! DOM to Binding Transaction cannot be created for "; + LOG.error(msg + "{}", source); + throw new IllegalStateException(msg + source); + } + if (mappingService == null) { + final String msg = "Mapping Service is not initialized correctly! DOM to Binding Transaction cannot be created for "; + LOG.error(msg + "{}", source); + throw new IllegalStateException(msg + source); + } + + org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction target = baDataService + .beginTransaction(); + for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier entry : source.getRemovedConfigurationData()) { + try { + + InstanceIdentifier baEntry = mappingService.fromDataDom(entry); + target.removeConfigurationData(baEntry); + } catch (DeserializationException e) { + LOG.error("Ommiting from BA transaction: {}.", entry, e); + } + } + for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier entry : source.getRemovedOperationalData()) { + try { + + InstanceIdentifier baEntry = mappingService.fromDataDom(entry); + target.removeOperationalData(baEntry); + } catch (DeserializationException e) { + LOG.error("Ommiting from BA transaction: {}.", entry, e); + } + } + for (Map.Entry entry : source + .getUpdatedConfigurationData().entrySet()) { + try { + InstanceIdentifier baKey = mappingService.fromDataDom(entry.getKey()); + DataObject baData = mappingService.dataObjectFromDataDom(baKey, entry.getValue()); + target.putConfigurationData(baKey, baData); + } catch (DeserializationException e) { + LOG.error("Ommiting from BA transaction: {}.", entry.getKey(), e); + } + } + for (Map.Entry entry : source + .getUpdatedOperationalData().entrySet()) { + try { + + InstanceIdentifier baKey = mappingService.fromDataDom(entry.getKey()); + DataObject baData = mappingService.dataObjectFromDataDom(baKey, entry.getValue()); + target.putOperationalData(baKey, baData); + } catch (DeserializationException e) { + LOG.error("Ommiting from BA transaction: {}.", entry.getKey(), e); + } + } + return target; + } +} diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/DomToBindingNotificationForwarder.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/DomToBindingNotificationForwarder.java new file mode 100644 index 0000000000..841ea550bc --- /dev/null +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/DomToBindingNotificationForwarder.java @@ -0,0 +1,70 @@ +package org.opendaylight.controller.sal.binding.impl.connect.dom; + +import java.lang.ref.WeakReference; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.opendaylight.controller.sal.binding.api.NotificationProviderService; +import org.opendaylight.controller.sal.core.api.notify.NotificationListener; +import org.opendaylight.controller.sal.core.api.notify.NotificationPublishService; +import org.opendaylight.yangtools.yang.binding.DataContainer; +import org.opendaylight.yangtools.yang.binding.Notification; +import org.opendaylight.yangtools.yang.binding.util.BindingReflections; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService; + +class DomToBindingNotificationForwarder implements NotificationProviderService.NotificationInterestListener, + NotificationListener { + + private final ConcurrentMap>> notifications = new ConcurrentHashMap<>(); + private final Set supportedNotifications = new HashSet<>(); + + private final BindingIndependentMappingService mappingService; + private final NotificationProviderService baNotifyService; + private final NotificationPublishService domNotificationService; + + DomToBindingNotificationForwarder(final BindingIndependentMappingService mappingService, final NotificationProviderService baNotifyService, + final NotificationPublishService domNotificationService) { + this.mappingService = mappingService; + this.baNotifyService = baNotifyService; + this.domNotificationService = domNotificationService; + } + + @Override + public Set getSupportedNotifications() { + return Collections.unmodifiableSet(supportedNotifications); + } + + @Override + public void onNotification(final CompositeNode notification) { + QName qname = notification.getNodeType(); + WeakReference> potential = notifications.get(qname); + if (potential != null) { + Class potentialClass = potential.get(); + if (potentialClass != null) { + final DataContainer baNotification = mappingService.dataObjectFromDataDom(potentialClass, + notification); + + if (baNotification instanceof Notification) { + baNotifyService.publish((Notification) baNotification); + } + } + } + } + + @Override + public void onNotificationSubscribtion(final Class notificationType) { + QName qname = BindingReflections.findQName(notificationType); + if (qname != null) { + WeakReference> already = notifications.putIfAbsent(qname, + new WeakReference>(notificationType)); + if (already == null) { + domNotificationService.addNotificationListener(qname, this); + supportedNotifications.add(qname); + } + } + } +} diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/DomToBindingRpcForwarder.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/DomToBindingRpcForwarder.java new file mode 100644 index 0000000000..620290c9fa --- /dev/null +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/DomToBindingRpcForwarder.java @@ -0,0 +1,279 @@ +package org.opendaylight.controller.sal.binding.impl.connect.dom; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.base.Function; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import java.lang.ref.WeakReference; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.WeakHashMap; +import java.util.concurrent.Callable; +import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; +import org.opendaylight.controller.sal.binding.api.rpc.RpcRouter; +import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl; +import org.opendaylight.controller.sal.core.api.Broker; +import org.opendaylight.controller.sal.core.api.RpcImplementation; +import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry; +import org.opendaylight.yangtools.concepts.CompositeObjectRegistration; +import org.opendaylight.yangtools.concepts.ObjectRegistration; + +import org.opendaylight.yangtools.yang.binding.BaseIdentity; +import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.binding.RpcService; +import org.opendaylight.yangtools.yang.binding.util.BindingReflections; +import org.opendaylight.yangtools.yang.binding.util.ClassLoaderUtils; +import org.opendaylight.yangtools.yang.binding.BindingMapping; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class DomToBindingRpcForwarder implements RpcImplementation, InvocationHandler { + + private final Logger LOG = LoggerFactory.getLogger(DomToBindingRpcForwarder.class); + + private final Set supportedRpcs; + private final WeakReference> rpcServiceType; + private Set registrations; + private final Map strategiesByQName = new HashMap<>(); + private final WeakHashMap strategiesByMethod = new WeakHashMap<>(); + private final RpcService proxy; + private ObjectRegistration forwarderRegistration; + private boolean registrationInProgress = false; + + private final RpcProvisionRegistry biRpcRegistry; + private final RpcProviderRegistry baRpcRegistry; + private final RpcProviderRegistryImpl baRpcRegistryImpl; + + private final Function, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier> toDOMInstanceIdentifier; + + private final static Method EQUALS_METHOD; + + static { + try { + EQUALS_METHOD = Object.class.getMethod("equals", Object.class); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public DomToBindingRpcForwarder(final Class service, final BindingIndependentMappingService mappingService, + final RpcProvisionRegistry biRpcRegistry, final RpcProviderRegistry baRpcRegistry) { + this.rpcServiceType = new WeakReference>(service); + this.supportedRpcs = mappingService.getRpcQNamesFor(service); + + toDOMInstanceIdentifier = new Function, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier>() { + + @Override + public org.opendaylight.yangtools.yang.data.api.InstanceIdentifier apply(final InstanceIdentifier input) { + return mappingService.toDataDom(input); + } + }; + + this.biRpcRegistry = biRpcRegistry; + this.baRpcRegistry = baRpcRegistry; + baRpcRegistryImpl = (RpcProviderRegistryImpl) baRpcRegistry; + + Class cls = rpcServiceType.get(); + ClassLoader clsLoader = cls.getClassLoader(); + proxy =(RpcService) Proxy.newProxyInstance(clsLoader, new Class[] { cls }, this); + createStrategies(mappingService); + } + + /** + * Constructor for Routed RPC Forwareder. + * + * @param service + * @param context + */ + public DomToBindingRpcForwarder(final Class service, + final Class context, final BindingIndependentMappingService mappingService, + final RpcProvisionRegistry biRpcRegistry, final RpcProviderRegistry baRpcRegistry) { + this(service, mappingService, biRpcRegistry, baRpcRegistry); + + final ImmutableSet.Builder registrationsBuilder = ImmutableSet.builder(); + try { + for (QName rpc : supportedRpcs) { + registrationsBuilder.add(biRpcRegistry.addRoutedRpcImplementation(rpc, this)); + } + createDefaultDomForwarder(); + } catch (Exception e) { + LOG.error("Could not forward Rpcs of type {}", service.getName(), e); + } + registrations = registrationsBuilder.build(); + } + + + + private void createStrategies(final BindingIndependentMappingService mappingService) { + try { + for (QName rpc : supportedRpcs) { + RpcInvocationStrategy strategy = createInvocationStrategy(rpc, rpcServiceType.get(), mappingService); + strategiesByMethod.put(strategy.targetMethod, strategy); + strategiesByQName.put(rpc, strategy); + } + } catch (Exception e) { + LOG.error("Could not forward Rpcs of type {}", rpcServiceType.get(), e); + } + + } + + /** + * Registers RPC Forwarder to DOM Broker, + * this means Binding Aware Broker has implementation of RPC + * which is registered to it. + * + * If RPC Forwarder was previously registered to DOM Broker + * or to Bidning Broker this method is noop to prevent + * creating forwarding loop. + * + */ + public void registerToDOMBroker() { + if(!registrationInProgress && forwarderRegistration == null) { + registrationInProgress = true; + CompositeObjectRegistration.CompositeObjectRegistrationBuilder builder = CompositeObjectRegistration.builderFor(this); + try { + for (QName rpc : supportedRpcs) { + builder.add(biRpcRegistry.addRpcImplementation(rpc, this)); + } + } catch (Exception e) { + LOG.error("Could not forward Rpcs of type {}", rpcServiceType.get(), e); + } + this.forwarderRegistration = builder.toInstance(); + registrationInProgress = false; + } + } + + + public void registerPaths(final Class context, + final Class service, final Set> set) { + QName ctx = BindingReflections.findQName(context); + for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier path : FluentIterable.from(set).transform( + toDOMInstanceIdentifier)) { + for (org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration reg : registrations) { + reg.registerPath(ctx, path); + } + } + } + + @Override + public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { + if (EQUALS_METHOD.equals(method)) { + return false; + } + RpcInvocationStrategy strategy = strategiesByMethod.get(method); + checkState(strategy != null); + checkArgument(args.length <= 2); + if (args.length == 1) { + checkArgument(args[0] instanceof DataObject); + return strategy.forwardToDomBroker((DataObject) args[0]); + } + return strategy.forwardToDomBroker(null); + } + + public void removePaths(final Class context, final Class service, + final Set> set) { + QName ctx = BindingReflections.findQName(context); + for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier path : FluentIterable.from(set).transform( + toDOMInstanceIdentifier)) { + for (org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration reg : registrations) { + reg.unregisterPath(ctx, path); + } + } + } + + @Override + public Set getSupportedRpcs() { + return supportedRpcs; + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public void createDefaultDomForwarder() { + if (baRpcRegistryImpl != null) { + Class cls = rpcServiceType.get(); + ClassLoader clsLoader = cls.getClassLoader(); + RpcService proxy = (RpcService) Proxy.newProxyInstance(clsLoader, new Class[] { cls }, this); + + RpcRouter rpcRouter = baRpcRegistryImpl.getRpcRouter(rpcServiceType.get()); + rpcRouter.registerDefaultService(proxy); + } + } + + @Override + public ListenableFuture> invokeRpc(final QName rpc, final CompositeNode domInput) { + checkArgument(rpc != null); + checkArgument(domInput != null); + + Class rpcType = rpcServiceType.get(); + checkState(rpcType != null); + RpcService rpcService = baRpcRegistry.getRpcService(rpcType); + checkState(rpcService != null); + CompositeNode domUnwrappedInput = domInput.getFirstCompositeByName(QName.create(rpc, "input")); + + try { + return Futures.immediateFuture(resolveInvocationStrategy(rpc).invokeOn(rpcService, domUnwrappedInput)); + } catch (Exception e) { + return Futures.immediateFailedFuture(e); + } + } + + private RpcInvocationStrategy resolveInvocationStrategy(final QName rpc) { + return strategiesByQName.get(rpc); + } + + private RpcInvocationStrategy createInvocationStrategy(final QName rpc, + final Class rpcType, final BindingIndependentMappingService mappingService) throws Exception { + return ClassLoaderUtils.withClassLoader(rpcType.getClassLoader(), new Callable() { + @Override + public RpcInvocationStrategy call() throws Exception { + String methodName = BindingMapping.getMethodName(rpc); + Method targetMethod = null; + for (Method possibleMethod : rpcType.getMethods()) { + if (possibleMethod.getName().equals(methodName) + && BindingReflections.isRpcMethod(possibleMethod)) { + targetMethod = possibleMethod; + break; + } + } + checkState(targetMethod != null, "Rpc method not found"); + return new RpcInvocationStrategy(rpc, targetMethod, mappingService, biRpcRegistry); + } + + }); + } + + /** + * Registers RPC Forwarder to Binding Broker, + * this means DOM Broekr has implementation of RPC + * which is registered to it. + * + * If RPC Forwarder was previously registered to DOM Broker + * or to Bidning Broker this method is noop to prevent + * creating forwarding loop. + * + */ + public void registerToBindingBroker() { + if(!registrationInProgress && forwarderRegistration == null) { + try { + registrationInProgress = true; + this.forwarderRegistration = baRpcRegistry.addRpcImplementation((Class)rpcServiceType.get(), proxy); + } catch (Exception e) { + LOG.error("Unable to forward RPCs for {}",rpcServiceType.get(),e); + } finally { + registrationInProgress = false; + } + } + } +} diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/DomToBindingRpcForwardingManager.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/DomToBindingRpcForwardingManager.java new file mode 100644 index 0000000000..04495f728c --- /dev/null +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/DomToBindingRpcForwardingManager.java @@ -0,0 +1,113 @@ +package org.opendaylight.controller.sal.binding.impl.connect.dom; + +import com.google.common.base.Optional; +import java.util.Map; +import java.util.Set; +import java.util.WeakHashMap; +import org.opendaylight.controller.md.sal.common.api.routing.RouteChange; +import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener; +import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; +import org.opendaylight.controller.sal.binding.api.rpc.RpcContextIdentifier; +import org.opendaylight.controller.sal.binding.api.rpc.RpcRouter; +import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl; +import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry; +import org.opendaylight.controller.sal.core.api.RpcRegistrationListener; +import org.opendaylight.yangtools.yang.binding.BaseIdentity; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.binding.RpcService; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService; + +/** + * Manager responsible for instantiating forwarders responsible for + * forwarding of RPC invocations from DOM Broker to Binding Aware Broker + * + */ +class DomToBindingRpcForwardingManager implements + RouteChangeListener>, + RpcProviderRegistryImpl.RouterInstantiationListener, + RpcProviderRegistryImpl.GlobalRpcRegistrationListener, RpcRegistrationListener { + + private final Map, DomToBindingRpcForwarder> forwarders = new WeakHashMap<>(); + private final BindingIndependentMappingService mappingService; + private final RpcProvisionRegistry biRpcRegistry; + private final RpcProviderRegistry baRpcRegistry; + private RpcProviderRegistryImpl registryImpl; + + DomToBindingRpcForwardingManager(final BindingIndependentMappingService mappingService, final RpcProvisionRegistry biRpcRegistry, + final RpcProviderRegistry baRpcRegistry) { + this.mappingService = mappingService; + this.biRpcRegistry = biRpcRegistry; + this.baRpcRegistry = baRpcRegistry; + } + + public RpcProviderRegistryImpl getRegistryImpl() { + return registryImpl; + } + + public void setRegistryImpl(final RpcProviderRegistryImpl registryImpl) { + this.registryImpl = registryImpl; + } + + @Override + public void onGlobalRpcRegistered(final Class cls) { + getRpcForwarder(cls, null).registerToDOMBroker(); + } + + @Override + public void onGlobalRpcUnregistered(final Class cls) { + // NOOP + } + + @Override + public void onRpcRouterCreated(final RpcRouter router) { + Class ctx = router.getContexts().iterator().next(); + getRpcForwarder(router.getServiceType(), ctx); + } + + @Override + public void onRouteChange(final RouteChange> change) { + for (Map.Entry>> entry : change.getAnnouncements().entrySet()) { + bindingRoutesAdded(entry); + } + } + + private void bindingRoutesAdded(final Map.Entry>> entry) { + Class context = entry.getKey().getRoutingContext(); + Class service = entry.getKey().getRpcService(); + if (context != null) { + getRpcForwarder(service, context).registerPaths(context, service, entry.getValue()); + } + } + + private DomToBindingRpcForwarder getRpcForwarder(final Class service, + final Class context) { + DomToBindingRpcForwarder potential = forwarders.get(service); + if (potential != null) { + return potential; + } + if (context == null) { + potential = new DomToBindingRpcForwarder(service, mappingService, biRpcRegistry, baRpcRegistry); + } else { + potential = new DomToBindingRpcForwarder(service, context, mappingService, biRpcRegistry, baRpcRegistry); + } + + forwarders.put(service, potential); + return potential; + } + + @Override + public void onRpcImplementationAdded(final QName name) { + + final Optional> rpcInterface = mappingService.getRpcServiceClassFor( + name.getNamespace().toString(), name.getFormattedRevision()); + if (rpcInterface.isPresent()) { + getRpcForwarder(rpcInterface.get(), null).registerToBindingBroker(); + } + } + + @Override + public void onRpcImplementationRemoved(final QName name) { + + } +} diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/DomToBindingTransaction.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/DomToBindingTransaction.java new file mode 100644 index 0000000000..45be5c96fa --- /dev/null +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/DomToBindingTransaction.java @@ -0,0 +1,60 @@ +package org.opendaylight.controller.sal.binding.impl.connect.dom; + +import java.util.Collections; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import org.opendaylight.controller.md.sal.common.api.TransactionStatus; +import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler; +import org.opendaylight.controller.md.sal.common.api.data.DataModification; +import org.opendaylight.controller.sal.common.util.Rpcs; +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; + +class DomToBindingTransaction implements + DataCommitHandler.DataCommitTransaction { + + private final org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction backing; + private final DataModification modification; + private final ConcurrentMap bindingOpenedTransactions; + + public DomToBindingTransaction( + final org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction backing, + final DataModification modification, + ConcurrentMap bindingOpenedTransactions) { + super(); + this.backing = backing; + this.modification = modification; + this.bindingOpenedTransactions = bindingOpenedTransactions; + this.bindingOpenedTransactions.put(backing.getIdentifier(), this); + } + + @Override + public DataModification getModification() { + return modification; + } + + @Override + public RpcResult rollback() throws IllegalStateException { + bindingOpenedTransactions.remove(backing.getIdentifier()); + return Rpcs.getRpcResult(true, null, Collections.emptySet()); + } + + @Override + public RpcResult finish() throws IllegalStateException { + Future> result = backing.commit(); + try { + RpcResult baResult = result.get(); + bindingOpenedTransactions.remove(backing.getIdentifier()); + return Rpcs.getRpcResult(baResult.isSuccessful(), null, baResult.getErrors()); + } catch (InterruptedException e) { + throw new IllegalStateException("", e); + } catch (ExecutionException e) { + throw new IllegalStateException("", e); + } finally { + bindingOpenedTransactions.remove(backing.getIdentifier()); + } + } +} -- 2.36.6