Bug-574 Added a 4th RPC Invocation strategy for the case where the RPC has an output...
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / sal / binding / impl / connect / dom / BindingIndependentConnector.java
index 37c0dfa60750280b9fe0963bf223a131b0e64753..b27c80d784e39fce232ae227a752f3dd3b77ab99 100644 (file)
@@ -35,7 +35,6 @@ import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
-import org.opendaylight.controller.md.sal.common.api.data.DataReader;
 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangePublisher;
@@ -46,6 +45,8 @@ import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.opendaylight.controller.sal.binding.api.data.RuntimeDataProvider;
 import org.opendaylight.controller.sal.binding.api.rpc.RpcContextIdentifier;
 import org.opendaylight.controller.sal.binding.api.rpc.RpcRouter;
+import org.opendaylight.controller.sal.binding.impl.DataBrokerImpl;
+import org.opendaylight.controller.sal.binding.impl.MountPointManagerImpl.BindingMountPointImpl;
 import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl;
 import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.GlobalRpcRegistrationListener;
 import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.RouterInstantiationListener;
@@ -90,19 +91,15 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSet.Builder;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 
 public class BindingIndependentConnector implements //
         RuntimeDataProvider, //
         Provider, //
         AutoCloseable {
 
-
-
     private final Logger LOG = LoggerFactory.getLogger(BindingIndependentConnector.class);
 
-    @SuppressWarnings("deprecation")
-    private static final InstanceIdentifier<? extends DataObject> ROOT = InstanceIdentifier.builder().toInstance();
-
     private static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier ROOT_BI = org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
             .builder().toInstance();
 
@@ -140,8 +137,6 @@ public class BindingIndependentConnector implements //
 
     };
 
-    private Registration<DataReader<InstanceIdentifier<? extends DataObject>, DataObject>> baDataReaderRegistration;
-
     private boolean rpcForwarding = false;
 
     private boolean dataForwarding = false;
@@ -202,30 +197,30 @@ public class BindingIndependentConnector implements //
     private DataModificationTransaction createBindingToDomTransaction(
             final DataModification<InstanceIdentifier<? extends DataObject>, DataObject> source) {
         DataModificationTransaction target = biDataService.beginTransaction();
-        LOG.debug("Created DOM Transaction {} for {},", target.getIdentifier(),source.getIdentifier());
+        LOG.debug("Created DOM Transaction {} for {},", target.getIdentifier(), source.getIdentifier());
         for (InstanceIdentifier<? extends DataObject> entry : source.getRemovedConfigurationData()) {
             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biEntry = mappingService.toDataDom(entry);
             target.removeConfigurationData(biEntry);
-            LOG.debug("Delete of Binding Configuration Data {} is translated to {}",entry,biEntry);
+            LOG.debug("Delete of Binding Configuration Data {} is translated to {}", entry, biEntry);
         }
         for (InstanceIdentifier<? extends DataObject> entry : source.getRemovedOperationalData()) {
             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biEntry = mappingService.toDataDom(entry);
             target.removeOperationalData(biEntry);
-            LOG.debug("Delete of Binding Operational Data {} is translated to {}",entry,biEntry);
+            LOG.debug("Delete of Binding Operational Data {} is translated to {}", entry, biEntry);
         }
         for (Entry<InstanceIdentifier<? extends DataObject>, DataObject> entry : source.getUpdatedConfigurationData()
                 .entrySet()) {
             Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> biEntry = mappingService
                     .toDataDom(entry);
             target.putConfigurationData(biEntry.getKey(), biEntry.getValue());
-            LOG.debug("Update of Binding Configuration Data {} is translated to {}",entry,biEntry);
+            LOG.debug("Update of Binding Configuration Data {} is translated to {}", entry, biEntry);
         }
         for (Entry<InstanceIdentifier<? extends DataObject>, DataObject> entry : source.getUpdatedOperationalData()
                 .entrySet()) {
             Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> biEntry = mappingService
                     .toDataDom(entry);
             target.putOperationalData(biEntry.getKey(), biEntry.getValue());
-            LOG.debug("Update of Binding Operational Data {} is translated to {}",entry,biEntry);
+            LOG.debug("Update of Binding Operational Data {} is translated to {}", entry, biEntry);
         }
 
         return target;
@@ -281,7 +276,8 @@ public class BindingIndependentConnector implements //
         return biDataService;
     }
 
-    protected void setDomDataService(final org.opendaylight.controller.sal.core.api.data.DataProviderService biDataService) {
+    protected void setDomDataService(
+            final org.opendaylight.controller.sal.core.api.data.DataProviderService biDataService) {
         this.biDataService = biDataService;
     }
 
@@ -302,20 +298,32 @@ public class BindingIndependentConnector implements //
     }
 
     public void startDataForwarding() {
-        if(baDataService instanceof AbstractForwardedDataBroker) {
+        if (baDataService instanceof AbstractForwardedDataBroker) {
             dataForwarding = true;
             return;
         }
-        checkState(!dataForwarding, "Connector is already forwarding data.");
-        baDataReaderRegistration = baDataService.registerDataReader(ROOT, this);
-        baCommitHandlerRegistration = baDataService.registerCommitHandler(ROOT, bindingToDomCommitHandler);
-        biCommitHandlerRegistration = biDataService.registerCommitHandler(ROOT_BI, domToBindingCommitHandler);
-        baDataService.registerCommitHandlerListener(domToBindingCommitHandler);
+
+        final DataProviderService baData;
+        if (baDataService instanceof BindingMountPointImpl) {
+            baData = ((BindingMountPointImpl) baDataService).getDataBrokerImpl();
+            LOG.debug("Extracted BA Data provider {} from mount point {}", baData, baDataService);
+        } else {
+            baData = baDataService;
+        }
+
+        if (baData instanceof DataBrokerImpl) {
+            checkState(!dataForwarding, "Connector is already forwarding data.");
+            ((DataBrokerImpl) baData).setDataReadDelegate(this);
+            ((DataBrokerImpl) baData).setRootCommitHandler(bindingToDomCommitHandler);
+            biCommitHandlerRegistration = biDataService.registerCommitHandler(ROOT_BI, domToBindingCommitHandler);
+            baDataService.registerCommitHandlerListener(domToBindingCommitHandler);
+        }
+
         dataForwarding = true;
     }
 
     public void startRpcForwarding() {
-        if (baRpcRegistry != null && biRpcRegistry != null && baRpcRegistry instanceof RouteChangePublisher<?, ?>) {
+        if (biRpcRegistry != null && baRpcRegistry instanceof RouteChangePublisher<?, ?>) {
             checkState(!rpcForwarding, "Connector is already forwarding RPCs");
             domToBindingRpcManager = baRpcRegistry.registerRouteChangeListener(new DomToBindingRpcForwardingManager());
             if (baRpcRegistry instanceof RpcProviderRegistryImpl) {
@@ -468,8 +476,8 @@ public class BindingIndependentConnector implements //
             }
             DataModificationTransaction domTransaction = createBindingToDomTransaction(bindingTransaction);
             BindingToDomTransaction wrapped = new BindingToDomTransaction(domTransaction, bindingTransaction);
-            LOG.trace("Forwarding Binding Transaction: {} as DOM Transaction: {} .", bindingTransaction.getIdentifier(),
-                    domTransaction.getIdentifier());
+            LOG.trace("Forwarding Binding Transaction: {} as DOM Transaction: {} .",
+                    bindingTransaction.getIdentifier(), domTransaction.getIdentifier());
             return wrapped;
         }
     }
@@ -479,7 +487,8 @@ public class BindingIndependentConnector implements //
             DataCommitHandler<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> {
 
         @Override
-        public void onRegister(final DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject> registration) {
+        public void onRegister(
+                final DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject> registration) {
 
             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier domPath = mappingService.toDataDom(registration
                     .getPath());
@@ -487,7 +496,8 @@ public class BindingIndependentConnector implements //
         }
 
         @Override
-        public void onUnregister(final DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject> registration) {
+        public void onUnregister(
+                final DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject> registration) {
             // NOOP for now
             // FIXME: do registration based on only active commit handlers.
         }
@@ -521,8 +531,7 @@ public class BindingIndependentConnector implements //
      *
      */
     private class DomToBindingRpcForwardingManager implements
-            RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>,
-            RouterInstantiationListener,
+            RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>, RouterInstantiationListener,
             GlobalRpcRegistrationListener {
 
         private final Map<Class<? extends RpcService>, DomToBindingRpcForwarder> forwarders = new WeakHashMap<>();
@@ -605,7 +614,7 @@ public class BindingIndependentConnector implements //
                 }
 
             } catch (Exception e) {
-                LOG.error("Could not forward Rpcs of type {}", service.getName(),e);
+                LOG.error("Could not forward Rpcs of type {}", service.getName(), e);
             }
             registrations = ImmutableSet.of();
         }
@@ -616,7 +625,8 @@ public class BindingIndependentConnector implements //
          * @param service
          * @param context
          */
-        public DomToBindingRpcForwarder(final Class<? extends RpcService> service, final Class<? extends BaseIdentity> context) {
+        public DomToBindingRpcForwarder(final Class<? extends RpcService> service,
+                final Class<? extends BaseIdentity> context) {
             this.rpcServiceType = new WeakReference<Class<? extends RpcService>>(service);
             this.supportedRpcs = mappingService.getRpcQNamesFor(service);
             Builder<RoutedRpcRegistration> registrationsBuilder = ImmutableSet
@@ -635,8 +645,8 @@ public class BindingIndependentConnector implements //
             registrations = registrationsBuilder.build();
         }
 
-        public void registerPaths(final Class<? extends BaseIdentity> context, final Class<? extends RpcService> service,
-                final Set<InstanceIdentifier<?>> set) {
+        public void registerPaths(final Class<? extends BaseIdentity> context,
+                final Class<? extends RpcService> service, final Set<InstanceIdentifier<?>> set) {
             QName ctx = BindingReflections.findQName(context);
             for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier path : FluentIterable.from(set).transform(
                     toDOMInstanceIdentifier)) {
@@ -646,7 +656,6 @@ public class BindingIndependentConnector implements //
             }
         }
 
-
         @Override
         public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
             if (EQUALS_METHOD.equals(method)) {
@@ -691,7 +700,7 @@ public class BindingIndependentConnector implements //
         }
 
         @Override
-        public RpcResult<CompositeNode> invokeRpc(final QName rpc, final CompositeNode domInput) {
+        public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(final QName rpc, final CompositeNode domInput) {
             checkArgument(rpc != null);
             checkArgument(domInput != null);
 
@@ -700,10 +709,11 @@ public class BindingIndependentConnector implements //
             RpcService rpcService = baRpcRegistry.getRpcService(rpcType);
             checkState(rpcService != null);
             CompositeNode domUnwrappedInput = domInput.getFirstCompositeByName(QName.create(rpc, "input"));
+
             try {
-                return resolveInvocationStrategy(rpc).invokeOn(rpcService, domUnwrappedInput);
+                return Futures.immediateFuture(resolveInvocationStrategy(rpc).invokeOn(rpcService, domUnwrappedInput));
             } catch (Exception e) {
-                throw new IllegalStateException(e);
+                return Futures.immediateFailedFuture(e);
             }
         }
 
@@ -736,12 +746,12 @@ public class BindingIndependentConnector implements //
                             strategy = new DefaultInvocationStrategy(rpc, targetMethod, outputClass.get(), inputClass
                                     .get());
                         } else {
-                            strategy = new NoInputNoOutputInvocationStrategy(rpc, targetMethod);
+                            strategy = new NoInputInvocationStrategy(rpc, targetMethod, outputClass.get());
                         }
-                    } else if(inputClass.isPresent()){
-                        strategy = new NoOutputInvocationStrategy(rpc,targetMethod, inputClass.get());
+                    } else if (inputClass.isPresent()) {
+                        strategy = new NoOutputInvocationStrategy(rpc, targetMethod, inputClass.get());
                     } else {
-                        strategy = new NoInputNoOutputInvocationStrategy(rpc,targetMethod);
+                        strategy = new NoInputNoOutputInvocationStrategy(rpc, targetMethod);
                     }
                     return strategy;
                 }
@@ -765,7 +775,8 @@ public class BindingIndependentConnector implements //
         public abstract RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput)
                 throws Exception;
 
-        public RpcResult<CompositeNode> invokeOn(final RpcService rpcService, final CompositeNode domInput) throws Exception {
+        public RpcResult<CompositeNode> invokeOn(final RpcService rpcService, final CompositeNode domInput)
+                throws Exception {
             return uncheckedInvoke(rpcService, domInput);
         }
     }
@@ -788,7 +799,8 @@ public class BindingIndependentConnector implements //
 
         @SuppressWarnings("unchecked")
         @Override
-        public RpcResult<CompositeNode> uncheckedInvoke(final RpcService rpcService, final CompositeNode domInput) throws Exception {
+        public RpcResult<CompositeNode> uncheckedInvoke(final RpcService rpcService, final CompositeNode domInput)
+                throws Exception {
             DataContainer bindingInput = mappingService.dataObjectFromDataDom(inputClass.get(), domInput);
             Future<RpcResult<?>> futureResult = (Future<RpcResult<?>>) targetMethod.invoke(rpcService, bindingInput);
             if (futureResult == null) {
@@ -797,28 +809,85 @@ public class BindingIndependentConnector implements //
             RpcResult<?> bindingResult = futureResult.get();
             final Object resultObj = bindingResult.getResult();
             if (resultObj instanceof DataObject) {
-                final CompositeNode output = mappingService.toDataDom((DataObject)resultObj);
-                return Rpcs.getRpcResult(true, output, Collections.<RpcError>emptySet());
+                final CompositeNode output = mappingService.toDataDom((DataObject) resultObj);
+                return Rpcs.getRpcResult(true, output, Collections.<RpcError> emptySet());
+            }
+            return Rpcs.getRpcResult(true);
+        }
+
+        @Override
+        public ListenableFuture<RpcResult<?>> forwardToDomBroker(final DataObject input) {
+            if (biRpcRegistry == null) {
+                return Futures.<RpcResult<?>> immediateFuture(Rpcs.getRpcResult(false));
+            }
+
+            CompositeNode xml = mappingService.toDataDom(input);
+            CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc, ImmutableList.<Node<?>> of(xml));
+
+            return Futures.transform(biRpcRegistry.invokeRpc(rpc, wrappedXml),
+                    new Function<RpcResult<CompositeNode>, RpcResult<?>>() {
+                        @Override
+                        public RpcResult<?> apply(final RpcResult<CompositeNode> input) {
+                            Object baResultValue = null;
+                            if (input.getResult() != null) {
+                                baResultValue = mappingService.dataObjectFromDataDom(outputClass.get(),
+                                        input.getResult());
+                            }
+                            return Rpcs.getRpcResult(input.isSuccessful(), baResultValue, input.getErrors());
+                        }
+                    });
+        }
+    }
+
+    private class NoInputInvocationStrategy extends RpcInvocationStrategy {
+
+        @SuppressWarnings("rawtypes")
+        private final WeakReference<Class> outputClass;
+
+        @SuppressWarnings({ "rawtypes", "unchecked" })
+        public NoInputInvocationStrategy(final QName rpc, final Method targetMethod, final Class<?> outputClass) {
+            super(rpc, targetMethod);
+            this.outputClass = new WeakReference(outputClass);
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public RpcResult<CompositeNode> uncheckedInvoke(final RpcService rpcService, final CompositeNode domInput)
+                throws Exception {
+            Future<RpcResult<?>> futureResult = (Future<RpcResult<?>>) targetMethod.invoke(rpcService);
+            if (futureResult == null) {
+                return Rpcs.getRpcResult(false);
+            }
+            RpcResult<?> bindingResult = futureResult.get();
+            final Object resultObj = bindingResult.getResult();
+            if (resultObj instanceof DataObject) {
+                final CompositeNode output = mappingService.toDataDom((DataObject) resultObj);
+                return Rpcs.getRpcResult(true, output, Collections.<RpcError> emptySet());
             }
             return Rpcs.getRpcResult(true);
         }
 
         @Override
         public Future<RpcResult<?>> forwardToDomBroker(final DataObject input) {
-            if(biRpcRegistry != null) {
+            if (biRpcRegistry != null) {
                 CompositeNode xml = mappingService.toDataDom(input);
                 CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc, ImmutableList.<Node<?>> of(xml));
-                RpcResult<CompositeNode> result = biRpcRegistry.invokeRpc(rpc, wrappedXml);
-                Object baResultValue = null;
-                if (result.getResult() != null) {
-                    baResultValue = mappingService.dataObjectFromDataDom(outputClass.get(), result.getResult());
-                }
-                RpcResult<?> baResult = Rpcs.getRpcResult(result.isSuccessful(), baResultValue, result.getErrors());
-                return Futures.<RpcResult<?>> immediateFuture(baResult);
+                return Futures.transform(biRpcRegistry.invokeRpc(rpc, wrappedXml),
+                        new Function<RpcResult<CompositeNode>, RpcResult<?>>() {
+                            @Override
+                            public RpcResult<?> apply(final RpcResult<CompositeNode> input) {
+                                Object baResultValue = null;
+                                if (input.getResult() != null) {
+                                    baResultValue = mappingService.dataObjectFromDataDom(outputClass.get(),
+                                            input.getResult());
+                                }
+                                return Rpcs.getRpcResult(input.isSuccessful(), baResultValue, input.getErrors());
+                            }
+                        });
+            } else {
+                return Futures.<RpcResult<?>> immediateFuture(Rpcs.getRpcResult(false));
             }
-            return Futures.<RpcResult<?>> immediateFuture(Rpcs.getRpcResult(false));
         }
-
     }
 
     private class NoInputNoOutputInvocationStrategy extends RpcInvocationStrategy {
@@ -828,7 +897,8 @@ public class BindingIndependentConnector implements //
         }
 
         @Override
-        public RpcResult<CompositeNode> uncheckedInvoke(final RpcService rpcService, final CompositeNode domInput) throws Exception {
+        public RpcResult<CompositeNode> uncheckedInvoke(final RpcService rpcService, final CompositeNode domInput)
+                throws Exception {
             @SuppressWarnings("unchecked")
             Future<RpcResult<Void>> result = (Future<RpcResult<Void>>) targetMethod.invoke(rpcService);
             RpcResult<Void> bindingResult = result.get();
@@ -843,20 +913,19 @@ public class BindingIndependentConnector implements //
 
     private class NoOutputInvocationStrategy extends RpcInvocationStrategy {
 
-
         @SuppressWarnings("rawtypes")
         private final WeakReference<Class> inputClass;
 
         @SuppressWarnings({ "rawtypes", "unchecked" })
         public NoOutputInvocationStrategy(final QName rpc, final Method targetMethod,
                 final Class<? extends DataContainer> inputClass) {
-            super(rpc,targetMethod);
+            super(rpc, targetMethod);
             this.inputClass = new WeakReference(inputClass);
         }
 
-
         @Override
-        public RpcResult<CompositeNode> uncheckedInvoke(final RpcService rpcService, final CompositeNode domInput) throws Exception {
+        public RpcResult<CompositeNode> uncheckedInvoke(final RpcService rpcService, final CompositeNode domInput)
+                throws Exception {
             DataContainer bindingInput = mappingService.dataObjectFromDataDom(inputClass.get(), domInput);
             Future<RpcResult<?>> result = (Future<RpcResult<?>>) targetMethod.invoke(rpcService, bindingInput);
             if (result == null) {
@@ -867,18 +936,22 @@ public class BindingIndependentConnector implements //
         }
 
         @Override
-        public Future<RpcResult<?>> forwardToDomBroker(final DataObject input) {
-            if(biRpcRegistry != null) {
-                CompositeNode xml = mappingService.toDataDom(input);
-                CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc,ImmutableList.<Node<?>>of(xml));
-                RpcResult<CompositeNode> result = biRpcRegistry.invokeRpc(rpc, wrappedXml);
-                Object baResultValue = null;
-                RpcResult<?> baResult = Rpcs.<Void>getRpcResult(result.isSuccessful(), null, result.getErrors());
-                return Futures.<RpcResult<?>>immediateFuture(baResult);
+        public ListenableFuture<RpcResult<?>> forwardToDomBroker(final DataObject input) {
+            if (biRpcRegistry == null) {
+                return Futures.<RpcResult<?>> immediateFuture(Rpcs.getRpcResult(false));
             }
-            return Futures.<RpcResult<?>>immediateFuture(Rpcs.getRpcResult(false));
-        }
 
+            CompositeNode xml = mappingService.toDataDom(input);
+            CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc, ImmutableList.<Node<?>> of(xml));
+
+            return Futures.transform(biRpcRegistry.invokeRpc(rpc, wrappedXml),
+                    new Function<RpcResult<CompositeNode>, RpcResult<?>>() {
+                        @Override
+                        public RpcResult<?> apply(final RpcResult<CompositeNode> input) {
+                            return Rpcs.<Void> getRpcResult(input.isSuccessful(), null, input.getErrors());
+                        }
+                    });
+        }
     }
 
     public boolean isRpcForwarding() {