More defensive RPC handling in DOM Broker
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / sal / binding / impl / connect / dom / BindingIndependentConnector.java
index 75b0138e7cd75657adba42182d0dc7e0cf9c270c..7a7e086ec4c9098ca91e6a7897745eea688819e0 100644 (file)
@@ -1,10 +1,22 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
 package org.opendaylight.controller.sal.binding.impl.connect.dom;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
 import java.lang.ref.WeakReference;
+import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -28,13 +40,15 @@ import org.opendaylight.controller.md.sal.common.api.routing.RouteChangePublishe
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.opendaylight.controller.sal.binding.api.data.RuntimeDataProvider;
+import org.opendaylight.controller.sal.binding.api.rpc.RpcContextIdentifier;
 import org.opendaylight.controller.sal.binding.api.rpc.RpcRouter;
 import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl;
-import org.opendaylight.controller.sal.binding.api.rpc.RpcContextIdentifier;
+import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.RouterInstantiationListener;
 import org.opendaylight.controller.sal.common.util.CommitHandlerTransactions;
 import org.opendaylight.controller.sal.common.util.Rpcs;
-import org.opendaylight.controller.sal.core.api.Provider;
 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
+import org.opendaylight.controller.sal.core.api.Provider;
 import org.opendaylight.controller.sal.core.api.RpcImplementation;
 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction;
@@ -54,15 +68,18 @@ import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.Node;
+import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
-
-import static com.google.common.base.Preconditions.*;
+import com.google.common.collect.ImmutableSet.Builder;
+import com.google.common.util.concurrent.Futures;
 
 public class BindingIndependentConnector implements //
         RuntimeDataProvider, //
@@ -71,11 +88,15 @@ public class BindingIndependentConnector implements //
 
     private final Logger LOG = LoggerFactory.getLogger(BindingIndependentConnector.class);
 
+    @SuppressWarnings( "deprecation")
     private static final InstanceIdentifier<? extends DataObject> ROOT = InstanceIdentifier.builder().toInstance();
 
     private static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier ROOT_BI = org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
             .builder().toInstance();
 
+    private final static Method EQUALS_METHOD;
+    
+    
     private BindingIndependentMappingService mappingService;
 
     private org.opendaylight.controller.sal.core.api.data.DataProviderService biDataService;
@@ -116,18 +137,33 @@ public class BindingIndependentConnector implements //
 
     private boolean notificationForwarding = false;
 
+    private RpcProviderRegistryImpl baRpcRegistryImpl;
+
+    private org.opendaylight.controller.sal.dom.broker.spi.RpcRouter biRouter;
+    
+    
+    static {
+        try {
+        EQUALS_METHOD = Object.class.getMethod("equals", Object.class);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     @Override
     public DataObject readOperationalData(InstanceIdentifier<? extends DataObject> path) {
         try {
             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath = mappingService.toDataDom(path);
             CompositeNode result = biDataService.readOperationalData(biPath);
-            return potentialAugmentationRead(path,biPath,result);
+            return potentialAugmentationRead(path, biPath, result);
         } catch (DeserializationException e) {
             throw new IllegalStateException(e);
         }
     }
 
-    private DataObject potentialAugmentationRead(InstanceIdentifier<? extends DataObject> path, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath, CompositeNode result) throws DeserializationException {
+    private DataObject potentialAugmentationRead(InstanceIdentifier<? extends DataObject> path,
+            org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath, CompositeNode result)
+            throws DeserializationException {
         Class<? extends DataObject> targetType = path.getTargetType();
         if (Augmentation.class.isAssignableFrom(targetType)) {
             path = mappingService.fromDataDom(biPath);
@@ -145,7 +181,7 @@ public class BindingIndependentConnector implements //
         try {
             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath = mappingService.toDataDom(path);
             CompositeNode result = biDataService.readConfigurationData(biPath);
-            return potentialAugmentationRead(path,biPath,result);
+            return potentialAugmentationRead(path, biPath, result);
         } catch (DeserializationException e) {
             throw new IllegalStateException(e);
         }
@@ -255,15 +291,22 @@ public class BindingIndependentConnector implements //
         baDataService.registerCommitHandlerListener(domToBindingCommitHandler);
         dataForwarding = true;
     }
-    
+
     public void startRpcForwarding() {
         if (baRpcRegistry != null && biRpcRegistry != null && baRpcRegistry instanceof RouteChangePublisher<?, ?>) {
-            checkState(!rpcForwarding,"Connector is already forwarding RPCs");
+            checkState(!rpcForwarding, "Connector is already forwarding RPCs");
             domToBindingRpcManager = baRpcRegistry.registerRouteChangeListener(new DomToBindingRpcForwardingManager());
+            if (baRpcRegistry instanceof RpcProviderRegistryImpl) {
+                baRpcRegistryImpl = (RpcProviderRegistryImpl) baRpcRegistry;
+                baRpcRegistryImpl.registerRouterInstantiationListener(domToBindingRpcManager.getInstance());
+            }
+            if(biRpcRegistry instanceof org.opendaylight.controller.sal.dom.broker.spi.RpcRouter) {
+                biRouter = (org.opendaylight.controller.sal.dom.broker.spi.RpcRouter) biRpcRegistry;
+            }
             rpcForwarding = true;
         }
     }
-    
+
     public void startNotificationForwarding() {
         checkState(!notificationForwarding, "Connector is already forwarding notifications.");
         notificationForwarding = true;
@@ -282,7 +325,7 @@ public class BindingIndependentConnector implements //
     public void onSessionInitiated(ProviderSession session) {
         setDomDataService(session.getService(org.opendaylight.controller.sal.core.api.data.DataProviderService.class));
         setDomRpcRegistry(session.getService(RpcProvisionRegistry.class));
-        
+
     }
 
     public <T extends RpcService> void onRpcRouterCreated(Class<T> serviceType, RpcRouter<T> router) {
@@ -447,10 +490,32 @@ public class BindingIndependentConnector implements //
         }
     }
 
+    /**
+     * Manager responsible for instantiating forwarders responsible for
+     * forwarding of RPC invocations from DOM Broker to Binding Aware Broker
+     * 
+     */
     private class DomToBindingRpcForwardingManager implements
-            RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>> {
+            RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>,
+            RouterInstantiationListener {
 
         private final Map<Class<? extends RpcService>, DomToBindingRpcForwarder> forwarders = new WeakHashMap<>();
+        private RpcProviderRegistryImpl registryImpl;
+
+        public RpcProviderRegistryImpl getRegistryImpl() {
+            return registryImpl;
+        }
+
+        public void setRegistryImpl(RpcProviderRegistryImpl registryImpl) {
+            this.registryImpl = registryImpl;
+        }
+        
+        
+        @Override
+        public void onRpcRouterCreated(RpcRouter<?> router) {
+            Class<? extends BaseIdentity> ctx = router.getContexts().iterator().next();
+            getRpcForwarder(router.getServiceType(), ctx);
+        }
 
         @Override
         public void onRouteChange(RouteChange<RpcContextIdentifier, InstanceIdentifier<?>> change) {
@@ -478,35 +543,61 @@ public class BindingIndependentConnector implements //
             } else {
                 potential = new DomToBindingRpcForwarder(service, context);
             }
+
             forwarders.put(service, potential);
             return potential;
         }
 
     }
 
-    private class DomToBindingRpcForwarder implements RpcImplementation {
+    private class DomToBindingRpcForwarder implements RpcImplementation, InvocationHandler {
 
         private final Set<QName> supportedRpcs;
         private final WeakReference<Class<? extends RpcService>> rpcServiceType;
         private Set<org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration> registrations;
+        private Map<QName, RpcInvocationStrategy> strategiesByQName = new HashMap<>();
+        private WeakHashMap<Method, RpcInvocationStrategy> strategiesByMethod = new WeakHashMap<>();
 
         public DomToBindingRpcForwarder(Class<? extends RpcService> service) {
             this.rpcServiceType = new WeakReference<Class<? extends RpcService>>(service);
             this.supportedRpcs = mappingService.getRpcQNamesFor(service);
-            for (QName rpc : supportedRpcs) {
-                biRpcRegistry.addRpcImplementation(rpc, this);
+            try {
+                for (QName rpc : supportedRpcs) {
+                    RpcInvocationStrategy strategy = createInvocationStrategy(rpc, service);
+                    strategiesByMethod.put(strategy.targetMethod, strategy);
+                    strategiesByQName.put(rpc, strategy);
+                    biRpcRegistry.addRpcImplementation(rpc, this);
+                }
+
+            } catch (Exception e) {
+                LOG.error("Could not forward Rpcs of type {}", service.getName());
             }
             registrations = ImmutableSet.of();
         }
 
+        /**
+         * Constructor for Routed RPC Forwareder.
+         * 
+         * @param service
+         * @param context
+         */
         public DomToBindingRpcForwarder(Class<? extends RpcService> service, Class<? extends BaseIdentity> context) {
             this.rpcServiceType = new WeakReference<Class<? extends RpcService>>(service);
             this.supportedRpcs = mappingService.getRpcQNamesFor(service);
-            registrations = new HashSet<>();
-            for (QName rpc : supportedRpcs) {
-                registrations.add(biRpcRegistry.addRoutedRpcImplementation(rpc, this));
+            Builder<RoutedRpcRegistration> registrationsBuilder = ImmutableSet
+                    .<org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration> builder();
+            try {
+                for (QName rpc : supportedRpcs) {
+                    RpcInvocationStrategy strategy = createInvocationStrategy(rpc, service);
+                    strategiesByMethod.put(strategy.targetMethod, strategy);
+                    strategiesByQName.put(rpc, strategy);
+                    registrationsBuilder.add(biRpcRegistry.addRoutedRpcImplementation(rpc, this));
+                }
+                createDefaultDomForwarder();
+            } catch (Exception e) {
+                LOG.error("Could not forward Rpcs of type {}", service.getName(),e);
             }
-            registrations = ImmutableSet.copyOf(registrations);
+            registrations = registrationsBuilder.build();
         }
 
         public void registerPaths(Class<? extends BaseIdentity> context, Class<? extends RpcService> service,
@@ -520,6 +611,22 @@ public class BindingIndependentConnector implements //
             }
         }
 
+        
+        @Override
+        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+            if(EQUALS_METHOD.equals(method)) {
+                return false;
+            }
+            RpcInvocationStrategy strategy = strategiesByMethod.get(method);
+            checkState(strategy != null);
+            checkArgument(args.length <= 2);
+            if(args.length == 1) {
+                checkArgument(args[0] instanceof DataObject);
+                return strategy.forwardToDomBroker((DataObject) args[0]);
+            }
+            return strategy.forwardToDomBroker(null);
+        }
+
         public void removePaths(Class<? extends BaseIdentity> context, Class<? extends RpcService> service,
                 Set<InstanceIdentifier<?>> set) {
             QName ctx = BindingReflections.findQName(context);
@@ -536,6 +643,18 @@ public class BindingIndependentConnector implements //
             return supportedRpcs;
         }
 
+        @SuppressWarnings({ "unchecked", "rawtypes" })
+        public void createDefaultDomForwarder() {
+            if (baRpcRegistryImpl != null) {
+                Class<?> cls = rpcServiceType.get();
+                ClassLoader clsLoader = cls.getClassLoader();
+                RpcService proxy = (RpcService) Proxy.newProxyInstance(clsLoader, new Class<?>[] { cls }, this);
+                
+                RpcRouter rpcRouter = baRpcRegistryImpl.getRpcRouter(rpcServiceType.get());
+                rpcRouter.registerDefaultService(proxy);
+            }
+        }
+
         @Override
         public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode domInput) {
             checkArgument(rpc != null);
@@ -547,13 +666,17 @@ public class BindingIndependentConnector implements //
             checkState(rpcService != null);
             CompositeNode domUnwrappedInput = domInput.getFirstCompositeByName(QName.create(rpc, "input"));
             try {
-                return resolveInvocationStrategy(rpc, rpcType).invokeOn(rpcService, domUnwrappedInput);
+                return resolveInvocationStrategy(rpc).invokeOn(rpcService, domUnwrappedInput);
             } catch (Exception e) {
                 throw new IllegalStateException(e);
             }
         }
 
-        private RpcInvocationStrategy resolveInvocationStrategy(final QName rpc,
+        private RpcInvocationStrategy resolveInvocationStrategy(QName rpc) {
+            return strategiesByQName.get(rpc);
+        }
+
+        private RpcInvocationStrategy createInvocationStrategy(final QName rpc,
                 final Class<? extends RpcService> rpcType) throws Exception {
             return ClassLoaderUtils.withClassLoader(rpcType.getClassLoader(), new Callable<RpcInvocationStrategy>() {
                 @Override
@@ -575,9 +698,9 @@ public class BindingIndependentConnector implements //
                     RpcInvocationStrategy strategy = null;
                     if (outputClass.isPresent()) {
                         if (inputClass.isPresent()) {
-                            strategy = new DefaultInvocationStrategy(targetMethod, outputClass.get(), inputClass.get());
+                            strategy = new DefaultInvocationStrategy(rpc,targetMethod, outputClass.get(), inputClass.get());
                         } else {
-                            strategy = new NoInputNoOutputInvocationStrategy(targetMethod);
+                            strategy = new NoInputNoOutputInvocationStrategy(rpc,targetMethod);
                         }
                     } else {
                         strategy = null;
@@ -592,11 +715,15 @@ public class BindingIndependentConnector implements //
     private abstract class RpcInvocationStrategy {
 
         protected final Method targetMethod;
+        protected final QName rpc;
 
-        public RpcInvocationStrategy(Method targetMethod) {
+        public RpcInvocationStrategy(QName rpc,Method targetMethod) {
             this.targetMethod = targetMethod;
+            this.rpc = rpc;
         }
 
+        public abstract Future<RpcResult<?>> forwardToDomBroker(DataObject input);
+
         public abstract RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput)
                 throws Exception;
 
@@ -614,9 +741,9 @@ public class BindingIndependentConnector implements //
         private WeakReference<Class> outputClass;
 
         @SuppressWarnings({ "rawtypes", "unchecked" })
-        public DefaultInvocationStrategy(Method targetMethod, Class<?> outputClass,
+        public DefaultInvocationStrategy(QName rpc, Method targetMethod, Class<?> outputClass,
                 Class<? extends DataContainer> inputClass) {
-            super(targetMethod);
+            super(rpc,targetMethod);
             this.outputClass = new WeakReference(outputClass);
             this.inputClass = new WeakReference(inputClass);
         }
@@ -631,13 +758,29 @@ public class BindingIndependentConnector implements //
             RpcResult<?> bindingResult = result.get();
             return Rpcs.getRpcResult(true);
         }
+        
+        @Override
+        public Future<RpcResult<?>> forwardToDomBroker(DataObject input) {
+            if(biRouter != null) { 
+                CompositeNode xml = mappingService.toDataDom(input);
+                CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc,ImmutableList.<Node<?>>of(xml));
+                RpcResult<CompositeNode> result = biRouter.invokeRpc(rpc, wrappedXml);
+                Object baResultValue = null;
+                if(result.getResult() != null) {
+                    baResultValue = mappingService.dataObjectFromDataDom(outputClass.get(), result.getResult());
+                }
+                RpcResult<?> baResult = Rpcs.getRpcResult(result.isSuccessful(), baResultValue, result.getErrors());
+                return Futures.<RpcResult<?>>immediateFuture(baResult);
+            }
+            return Futures.<RpcResult<?>>immediateFuture(Rpcs.getRpcResult(false));
+        }
 
     }
 
     private class NoInputNoOutputInvocationStrategy extends RpcInvocationStrategy {
 
-        public NoInputNoOutputInvocationStrategy(Method targetMethod) {
-            super(targetMethod);
+        public NoInputNoOutputInvocationStrategy(QName rpc, Method targetMethod) {
+            super(rpc,targetMethod);
         }
 
         public RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput) throws Exception {
@@ -646,6 +789,11 @@ public class BindingIndependentConnector implements //
             RpcResult<Void> bindingResult = result.get();
             return Rpcs.getRpcResult(bindingResult.isSuccessful(), bindingResult.getErrors());
         }
+        
+        @Override
+        public Future<RpcResult<?>> forwardToDomBroker(DataObject input) {
+            return Futures.immediateFuture(null);
+        }
     }
 
     public boolean isRpcForwarding() {