Merge "Walk the tables directly"
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / sal / binding / impl / connect / dom / BindingIndependentConnector.java
index 5d48548efd816dfb200039c12d3a948e0638f200..e48ebbc0577f1b6101a772284ba9e07b3e6580cf 100644 (file)
@@ -17,6 +17,7 @@ import java.lang.reflect.Proxy;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -37,6 +38,8 @@ import org.opendaylight.controller.md.sal.common.api.data.DataReader;
 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangePublisher;
+import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.controller.sal.binding.api.NotificationProviderService.NotificationInterestListener;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.opendaylight.controller.sal.binding.api.data.RuntimeDataProvider;
@@ -53,6 +56,8 @@ import org.opendaylight.controller.sal.core.api.Provider;
 import org.opendaylight.controller.sal.core.api.RpcImplementation;
 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction;
+import org.opendaylight.controller.sal.core.api.notify.NotificationListener;
+import org.opendaylight.controller.sal.core.api.notify.NotificationPublishService;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.concepts.util.ClassLoaderUtils;
@@ -63,6 +68,7 @@ import org.opendaylight.yangtools.yang.binding.BindingMapping;
 import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.Notification;
 import org.opendaylight.yangtools.yang.binding.RpcService;
 import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
 import org.opendaylight.yangtools.yang.common.QName;
@@ -71,6 +77,8 @@ import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.data.api.Node;
 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
+import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
+import org.opendaylight.yangtools.yang.data.impl.codec.DeserializationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -87,28 +95,29 @@ public class BindingIndependentConnector implements //
         Provider, //
         AutoCloseable {
 
+
+
     private final Logger LOG = LoggerFactory.getLogger(BindingIndependentConnector.class);
 
-    @SuppressWarnings( "deprecation")
+    @SuppressWarnings("deprecation")
     private static final InstanceIdentifier<? extends DataObject> ROOT = InstanceIdentifier.builder().toInstance();
 
     private static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier ROOT_BI = org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
             .builder().toInstance();
 
     private final static Method EQUALS_METHOD;
-    
-    
+
     private BindingIndependentMappingService mappingService;
 
     private org.opendaylight.controller.sal.core.api.data.DataProviderService biDataService;
 
     private DataProviderService baDataService;
 
-    private ConcurrentMap<Object, BindingToDomTransaction> domOpenedTransactions = new ConcurrentHashMap<>();
-    private ConcurrentMap<Object, DomToBindingTransaction> bindingOpenedTransactions = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Object, BindingToDomTransaction> domOpenedTransactions = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Object, DomToBindingTransaction> bindingOpenedTransactions = new ConcurrentHashMap<>();
 
-    private BindingToDomCommitHandler bindingToDomCommitHandler = new BindingToDomCommitHandler();
-    private DomToBindingCommitHandler domToBindingCommitHandler = new DomToBindingCommitHandler();
+    private final BindingToDomCommitHandler bindingToDomCommitHandler = new BindingToDomCommitHandler();
+    private final DomToBindingCommitHandler domToBindingCommitHandler = new DomToBindingCommitHandler();
 
     private Registration<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> baCommitHandlerRegistration;
 
@@ -121,7 +130,7 @@ public class BindingIndependentConnector implements //
     // private ListenerRegistration<BindingToDomRpcForwardingManager>
     // bindingToDomRpcManager;
 
-    private Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier> toDOMInstanceIdentifier = new Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier>() {
+    private final Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier> toDOMInstanceIdentifier = new Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier>() {
 
         @Override
         public org.opendaylight.yangtools.yang.data.api.InstanceIdentifier apply(InstanceIdentifier<?> input) {
@@ -140,12 +149,13 @@ public class BindingIndependentConnector implements //
 
     private RpcProviderRegistryImpl baRpcRegistryImpl;
 
-    private org.opendaylight.controller.sal.dom.broker.spi.RpcRouter biRouter;
-    
-    
+    private NotificationProviderService baNotifyService;
+
+    private NotificationPublishService domNotificationService;
+
     static {
         try {
-        EQUALS_METHOD = Object.class.getMethod("equals", Object.class);
+            EQUALS_METHOD = Object.class.getMethod("equals", Object.class);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -307,16 +317,17 @@ public class BindingIndependentConnector implements //
                 baRpcRegistryImpl.registerRouterInstantiationListener(domToBindingRpcManager.getInstance());
                 baRpcRegistryImpl.registerGlobalRpcRegistrationListener(domToBindingRpcManager.getInstance());
             }
-            if(biRpcRegistry instanceof org.opendaylight.controller.sal.dom.broker.spi.RpcRouter) {
-                biRouter = (org.opendaylight.controller.sal.dom.broker.spi.RpcRouter) biRpcRegistry;
-            }
             rpcForwarding = true;
         }
     }
 
     public void startNotificationForwarding() {
         checkState(!notificationForwarding, "Connector is already forwarding notifications.");
-        notificationForwarding = true;
+        if (baNotifyService != null && domNotificationService != null) {
+            baNotifyService.registerInterestListener(new DomToBindingNotificationForwarder());
+
+            notificationForwarding = true;
+        }
     }
 
     protected void setMappingService(BindingIndependentMappingService mappingService) {
@@ -397,8 +408,8 @@ public class BindingIndependentConnector implements //
     private class BindingToDomTransaction implements
             DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> {
 
-        private DataModificationTransaction backing;
-        private DataModification<InstanceIdentifier<? extends DataObject>, DataObject> modification;
+        private final DataModificationTransaction backing;
+        private final DataModification<InstanceIdentifier<? extends DataObject>, DataObject> modification;
 
         public BindingToDomTransaction(DataModificationTransaction backing,
                 DataModification<InstanceIdentifier<? extends DataObject>, DataObject> modification) {
@@ -458,11 +469,11 @@ public class BindingIndependentConnector implements //
     }
 
     private class DomToBindingCommitHandler implements //
-            RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<?>, DataObject>>, //
+            RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject>>, //
             DataCommitHandler<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> {
 
         @Override
-        public void onRegister(DataCommitHandlerRegistration<InstanceIdentifier<?>, DataObject> registration) {
+        public void onRegister(DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject> registration) {
 
             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier domPath = mappingService.toDataDom(registration
                     .getPath());
@@ -470,11 +481,12 @@ public class BindingIndependentConnector implements //
         }
 
         @Override
-        public void onUnregister(DataCommitHandlerRegistration<InstanceIdentifier<?>, DataObject> registration) {
+        public void onUnregister(DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject> registration) {
             // NOOP for now
             // FIXME: do registration based on only active commit handlers.
         }
 
+        @Override
         public org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> requestCommit(
                 DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> domTransaction) {
             Object identifier = domTransaction.getIdentifier();
@@ -500,7 +512,7 @@ public class BindingIndependentConnector implements //
     /**
      * Manager responsible for instantiating forwarders responsible for
      * forwarding of RPC invocations from DOM Broker to Binding Aware Broker
-     * 
+     *
      */
     private class DomToBindingRpcForwardingManager implements
             RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>,
@@ -517,17 +529,17 @@ public class BindingIndependentConnector implements //
         public void setRegistryImpl(RpcProviderRegistryImpl registryImpl) {
             this.registryImpl = registryImpl;
         }
-        
+
         @Override
         public void onGlobalRpcRegistered(Class<? extends RpcService> cls) {
             getRpcForwarder(cls, null);
         }
-        
+
         @Override
         public void onGlobalRpcUnregistered(Class<? extends RpcService> cls) {
             // NOOP
         }
-        
+
         @Override
         public void onRpcRouterCreated(RpcRouter<?> router) {
             Class<? extends BaseIdentity> ctx = router.getContexts().iterator().next();
@@ -571,9 +583,9 @@ public class BindingIndependentConnector implements //
 
         private final Set<QName> supportedRpcs;
         private final WeakReference<Class<? extends RpcService>> rpcServiceType;
-        private Set<org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration> registrations;
-        private Map<QName, RpcInvocationStrategy> strategiesByQName = new HashMap<>();
-        private WeakHashMap<Method, RpcInvocationStrategy> strategiesByMethod = new WeakHashMap<>();
+        private final Set<org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration> registrations;
+        private final Map<QName, RpcInvocationStrategy> strategiesByQName = new HashMap<>();
+        private final WeakHashMap<Method, RpcInvocationStrategy> strategiesByMethod = new WeakHashMap<>();
 
         public DomToBindingRpcForwarder(Class<? extends RpcService> service) {
             this.rpcServiceType = new WeakReference<Class<? extends RpcService>>(service);
@@ -587,14 +599,14 @@ public class BindingIndependentConnector implements //
                 }
 
             } catch (Exception e) {
-                LOG.error("Could not forward Rpcs of type {}", service.getName());
+                LOG.error("Could not forward Rpcs of type {}", service.getName(),e);
             }
             registrations = ImmutableSet.of();
         }
 
         /**
          * Constructor for Routed RPC Forwareder.
-         * 
+         *
          * @param service
          * @param context
          */
@@ -612,7 +624,7 @@ public class BindingIndependentConnector implements //
                 }
                 createDefaultDomForwarder();
             } catch (Exception e) {
-                LOG.error("Could not forward Rpcs of type {}", service.getName(),e);
+                LOG.error("Could not forward Rpcs of type {}", service.getName(), e);
             }
             registrations = registrationsBuilder.build();
         }
@@ -628,16 +640,16 @@ public class BindingIndependentConnector implements //
             }
         }
 
-        
+
         @Override
         public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
-            if(EQUALS_METHOD.equals(method)) {
+            if (EQUALS_METHOD.equals(method)) {
                 return false;
             }
             RpcInvocationStrategy strategy = strategiesByMethod.get(method);
             checkState(strategy != null);
             checkArgument(args.length <= 2);
-            if(args.length == 1) {
+            if (args.length == 1) {
                 checkArgument(args[0] instanceof DataObject);
                 return strategy.forwardToDomBroker((DataObject) args[0]);
             }
@@ -666,7 +678,7 @@ public class BindingIndependentConnector implements //
                 Class<?> cls = rpcServiceType.get();
                 ClassLoader clsLoader = cls.getClassLoader();
                 RpcService proxy = (RpcService) Proxy.newProxyInstance(clsLoader, new Class<?>[] { cls }, this);
-                
+
                 RpcRouter rpcRouter = baRpcRegistryImpl.getRpcRouter(rpcServiceType.get());
                 rpcRouter.registerDefaultService(proxy);
             }
@@ -715,12 +727,15 @@ public class BindingIndependentConnector implements //
                     RpcInvocationStrategy strategy = null;
                     if (outputClass.isPresent()) {
                         if (inputClass.isPresent()) {
-                            strategy = new DefaultInvocationStrategy(rpc,targetMethod, outputClass.get(), inputClass.get());
+                            strategy = new DefaultInvocationStrategy(rpc, targetMethod, outputClass.get(), inputClass
+                                    .get());
                         } else {
-                            strategy = new NoInputNoOutputInvocationStrategy(rpc,targetMethod);
+                            strategy = new NoInputNoOutputInvocationStrategy(rpc, targetMethod);
                         }
+                    } else if(inputClass.isPresent()){
+                        strategy = new NoOutputInvocationStrategy(rpc,targetMethod, inputClass.get());
                     } else {
-                        strategy = null;
+                        strategy = new NoInputNoOutputInvocationStrategy(rpc,targetMethod);
                     }
                     return strategy;
                 }
@@ -734,7 +749,7 @@ public class BindingIndependentConnector implements //
         protected final Method targetMethod;
         protected final QName rpc;
 
-        public RpcInvocationStrategy(QName rpc,Method targetMethod) {
+        public RpcInvocationStrategy(QName rpc, Method targetMethod) {
             this.targetMethod = targetMethod;
             this.rpc = rpc;
         }
@@ -752,44 +767,50 @@ public class BindingIndependentConnector implements //
     private class DefaultInvocationStrategy extends RpcInvocationStrategy {
 
         @SuppressWarnings("rawtypes")
-        private WeakReference<Class> inputClass;
+        private final WeakReference<Class> inputClass;
 
         @SuppressWarnings("rawtypes")
-        private WeakReference<Class> outputClass;
+        private final WeakReference<Class> outputClass;
 
         @SuppressWarnings({ "rawtypes", "unchecked" })
         public DefaultInvocationStrategy(QName rpc, Method targetMethod, Class<?> outputClass,
                 Class<? extends DataContainer> inputClass) {
-            super(rpc,targetMethod);
+            super(rpc, targetMethod);
             this.outputClass = new WeakReference(outputClass);
             this.inputClass = new WeakReference(inputClass);
         }
 
+        @SuppressWarnings("unchecked")
         @Override
         public RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput) throws Exception {
             DataContainer bindingInput = mappingService.dataObjectFromDataDom(inputClass.get(), domInput);
-            Future<RpcResult<?>> result = (Future<RpcResult<?>>) targetMethod.invoke(rpcService, bindingInput);
-            if (result == null) {
+            Future<RpcResult<?>> futureResult = (Future<RpcResult<?>>) targetMethod.invoke(rpcService, bindingInput);
+            if (futureResult == null) {
                 return Rpcs.getRpcResult(false);
             }
-            RpcResult<?> bindingResult = result.get();
+            RpcResult<?> bindingResult = futureResult.get();
+            final Object resultObj = bindingResult.getResult();
+            if (resultObj instanceof DataObject) {
+                final CompositeNode output = mappingService.toDataDom((DataObject)resultObj);
+                return Rpcs.getRpcResult(true, output, Collections.<RpcError>emptySet());
+            }
             return Rpcs.getRpcResult(true);
         }
-        
+
         @Override
         public Future<RpcResult<?>> forwardToDomBroker(DataObject input) {
-            if(biRouter != null) { 
+            if(biRpcRegistry != null) {
                 CompositeNode xml = mappingService.toDataDom(input);
-                CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc,ImmutableList.<Node<?>>of(xml));
-                RpcResult<CompositeNode> result = biRouter.invokeRpc(rpc, wrappedXml);
+                CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc, ImmutableList.<Node<?>> of(xml));
+                RpcResult<CompositeNode> result = biRpcRegistry.invokeRpc(rpc, wrappedXml);
                 Object baResultValue = null;
-                if(result.getResult() != null) {
+                if (result.getResult() != null) {
                     baResultValue = mappingService.dataObjectFromDataDom(outputClass.get(), result.getResult());
                 }
                 RpcResult<?> baResult = Rpcs.getRpcResult(result.isSuccessful(), baResultValue, result.getErrors());
-                return Futures.<RpcResult<?>>immediateFuture(baResult);
+                return Futures.<RpcResult<?>> immediateFuture(baResult);
             }
-            return Futures.<RpcResult<?>>immediateFuture(Rpcs.getRpcResult(false));
+            return Futures.<RpcResult<?>> immediateFuture(Rpcs.getRpcResult(false));
         }
 
     }
@@ -797,22 +818,63 @@ public class BindingIndependentConnector implements //
     private class NoInputNoOutputInvocationStrategy extends RpcInvocationStrategy {
 
         public NoInputNoOutputInvocationStrategy(QName rpc, Method targetMethod) {
-            super(rpc,targetMethod);
+            super(rpc, targetMethod);
         }
 
+        @Override
         public RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput) throws Exception {
             @SuppressWarnings("unchecked")
             Future<RpcResult<Void>> result = (Future<RpcResult<Void>>) targetMethod.invoke(rpcService);
             RpcResult<Void> bindingResult = result.get();
             return Rpcs.getRpcResult(bindingResult.isSuccessful(), bindingResult.getErrors());
         }
-        
+
         @Override
         public Future<RpcResult<?>> forwardToDomBroker(DataObject input) {
             return Futures.immediateFuture(null);
         }
     }
 
+    private class NoOutputInvocationStrategy extends RpcInvocationStrategy {
+
+
+        @SuppressWarnings("rawtypes")
+        private final WeakReference<Class> inputClass;
+
+        @SuppressWarnings({ "rawtypes", "unchecked" })
+        public NoOutputInvocationStrategy(QName rpc, Method targetMethod,
+                Class<? extends DataContainer> inputClass) {
+            super(rpc,targetMethod);
+            this.inputClass = new WeakReference(inputClass);
+        }
+
+
+        @Override
+        public RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput) throws Exception {
+            DataContainer bindingInput = mappingService.dataObjectFromDataDom(inputClass.get(), domInput);
+            Future<RpcResult<?>> result = (Future<RpcResult<?>>) targetMethod.invoke(rpcService, bindingInput);
+            if (result == null) {
+                return Rpcs.getRpcResult(false);
+            }
+            RpcResult<?> bindingResult = result.get();
+            return Rpcs.getRpcResult(true);
+        }
+
+        @Override
+        public Future<RpcResult<?>> forwardToDomBroker(DataObject input) {
+            if(biRpcRegistry != null) {
+                CompositeNode xml = mappingService.toDataDom(input);
+                CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc,ImmutableList.<Node<?>>of(xml));
+                RpcResult<CompositeNode> result = biRpcRegistry.invokeRpc(rpc, wrappedXml);
+                Object baResultValue = null;
+                RpcResult<?> baResult = Rpcs.<Void>getRpcResult(result.isSuccessful(), null, result.getErrors());
+                return Futures.<RpcResult<?>>immediateFuture(baResult);
+            }
+            return Futures.<RpcResult<?>>immediateFuture(Rpcs.getRpcResult(false));
+        }
+
+    }
+
     public boolean isRpcForwarding() {
         return rpcForwarding;
     }
@@ -822,11 +884,60 @@ public class BindingIndependentConnector implements //
     }
 
     public boolean isNotificationForwarding() {
-        // TODO Auto-generated method stub
         return notificationForwarding;
     }
 
     public BindingIndependentMappingService getMappingService() {
         return mappingService;
     }
+
+    public void setBindingNotificationService(NotificationProviderService baService) {
+        this.baNotifyService = baService;
+
+    }
+
+    public void setDomNotificationService(NotificationPublishService domService) {
+        this.domNotificationService = domService;
+    }
+
+    private class DomToBindingNotificationForwarder implements NotificationInterestListener, NotificationListener {
+
+        private final ConcurrentMap<QName, WeakReference<Class<? extends Notification>>> notifications = new ConcurrentHashMap<>();
+        private final Set<QName> supportedNotifications = new HashSet<>();
+
+        @Override
+        public Set<QName> getSupportedNotifications() {
+            return Collections.unmodifiableSet(supportedNotifications);
+        }
+
+        @Override
+        public void onNotification(CompositeNode notification) {
+            QName qname = notification.getNodeType();
+            WeakReference<Class<? extends Notification>> potential = notifications.get(qname);
+            if (potential != null) {
+                Class<? extends Notification> potentialClass = potential.get();
+                if (potentialClass != null) {
+                    final DataContainer baNotification = mappingService.dataObjectFromDataDom(potentialClass,
+                            notification);
+
+                    if (baNotification instanceof Notification) {
+                        baNotifyService.publish((Notification) baNotification);
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void onNotificationSubscribtion(Class<? extends Notification> notificationType) {
+            QName qname = BindingReflections.findQName(notificationType);
+            if (qname != null) {
+                WeakReference<Class<? extends Notification>> already = notifications.putIfAbsent(qname,
+                        new WeakReference<Class<? extends Notification>>(notificationType));
+                if (already == null) {
+                    domNotificationService.addNotificationListener(qname, this);
+                    supportedNotifications.add(qname);
+                }
+            }
+        }
+    }
 }