Change RpcImplementation contract to asynchronous
authorRobert Varga <rovarga@cisco.com>
Thu, 27 Feb 2014 08:02:42 +0000 (09:02 +0100)
committerRobert Varga <rovarga@cisco.com>
Mon, 14 Apr 2014 07:47:44 +0000 (07:47 +0000)
This changes the method used to invoke RPCs such that they need not be
synchronous anymore, but rather return a ListenableFuture. If an
implementation is synchronous, it should use Futures.immediateFuture()
as a wrapper.

Change-Id: I0623d2afda038ba49afa83ed6020910b74b4911e
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingIndependentConnector.java
opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/util/BindingTestContext.java
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RoutedRpcDefaultImplementation.java
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RpcImplementation.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/BrokerImpl.xtend
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/MountPointImpl.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/SchemaAwareRpcBroker.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/osgi/RpcProvisionRegistryProxy.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/spi/RoutedRpcProcessor.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/spi/RpcRouter.java

index 9edea0c2fd59fb9f50b94551fbf5574a174a2a07..93849c2e91f47142f71d7cd1fb55cf3d1a506152 100644 (file)
@@ -91,6 +91,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSet.Builder;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 
 public class BindingIndependentConnector implements //
         RuntimeDataProvider, //
@@ -699,8 +700,9 @@ public class BindingIndependentConnector implements //
             }
         }
 
+
         @Override
-        public RpcResult<CompositeNode> invokeRpc(final QName rpc, final CompositeNode domInput) {
+        public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(final QName rpc, final CompositeNode domInput) {
             checkArgument(rpc != null);
             checkArgument(domInput != null);
 
@@ -709,10 +711,11 @@ public class BindingIndependentConnector implements //
             RpcService rpcService = baRpcRegistry.getRpcService(rpcType);
             checkState(rpcService != null);
             CompositeNode domUnwrappedInput = domInput.getFirstCompositeByName(QName.create(rpc, "input"));
+
             try {
-                return resolveInvocationStrategy(rpc).invokeOn(rpcService, domUnwrappedInput);
+                return Futures.immediateFuture(resolveInvocationStrategy(rpc).invokeOn(rpcService, domUnwrappedInput));
             } catch (Exception e) {
-                throw new IllegalStateException(e);
+                return Futures.immediateFailedFuture(e);
             }
         }
 
@@ -813,21 +816,25 @@ public class BindingIndependentConnector implements //
         }
 
         @Override
-        public Future<RpcResult<?>> forwardToDomBroker(final DataObject input) {
-            if(biRpcRegistry != null) {
-                CompositeNode xml = mappingService.toDataDom(input);
-                CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc, ImmutableList.<Node<?>> of(xml));
-                RpcResult<CompositeNode> result = biRpcRegistry.invokeRpc(rpc, wrappedXml);
-                Object baResultValue = null;
-                if (result.getResult() != null) {
-                    baResultValue = mappingService.dataObjectFromDataDom(outputClass.get(), result.getResult());
-                }
-                RpcResult<?> baResult = Rpcs.getRpcResult(result.isSuccessful(), baResultValue, result.getErrors());
-                return Futures.<RpcResult<?>> immediateFuture(baResult);
+        public ListenableFuture<RpcResult<?>> forwardToDomBroker(final DataObject input) {
+            if(biRpcRegistry == null) {
+                return Futures.<RpcResult<?>> immediateFuture(Rpcs.getRpcResult(false));
             }
-            return Futures.<RpcResult<?>> immediateFuture(Rpcs.getRpcResult(false));
-        }
 
+            CompositeNode xml = mappingService.toDataDom(input);
+            CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc, ImmutableList.<Node<?>> of(xml));
+
+            return Futures.transform(biRpcRegistry.invokeRpc(rpc, wrappedXml), new Function<RpcResult<CompositeNode>, RpcResult<?>>() {
+                @Override
+                public RpcResult<?> apply(RpcResult<CompositeNode> input) {
+                    Object baResultValue = null;
+                    if (input.getResult() != null) {
+                        baResultValue = mappingService.dataObjectFromDataDom(outputClass.get(), input.getResult());
+                    }
+                    return Rpcs.getRpcResult(input.isSuccessful(), baResultValue, input.getErrors());
+                }
+            });
+        }
     }
 
     private class NoInputNoOutputInvocationStrategy extends RpcInvocationStrategy {
@@ -876,18 +883,21 @@ public class BindingIndependentConnector implements //
         }
 
         @Override
-        public Future<RpcResult<?>> forwardToDomBroker(final DataObject input) {
-            if(biRpcRegistry != null) {
-                CompositeNode xml = mappingService.toDataDom(input);
-                CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc,ImmutableList.<Node<?>>of(xml));
-                RpcResult<CompositeNode> result = biRpcRegistry.invokeRpc(rpc, wrappedXml);
-                Object baResultValue = null;
-                RpcResult<?> baResult = Rpcs.<Void>getRpcResult(result.isSuccessful(), null, result.getErrors());
-                return Futures.<RpcResult<?>>immediateFuture(baResult);
+        public ListenableFuture<RpcResult<?>> forwardToDomBroker(final DataObject input) {
+            if(biRpcRegistry == null) {
+                return Futures.<RpcResult<?>> immediateFuture(Rpcs.getRpcResult(false));
             }
-            return Futures.<RpcResult<?>>immediateFuture(Rpcs.getRpcResult(false));
-        }
 
+            CompositeNode xml = mappingService.toDataDom(input);
+            CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc,ImmutableList.<Node<?>>of(xml));
+
+            return Futures.transform(biRpcRegistry.invokeRpc(rpc, wrappedXml), new Function<RpcResult<CompositeNode>, RpcResult<?>>() {
+                @Override
+                public RpcResult<?> apply(RpcResult<CompositeNode> input) {
+                    return Rpcs.<Void>getRpcResult(input.isSuccessful(), null, input.getErrors());
+                }
+            });
+        }
     }
 
     public boolean isRpcForwarding() {
index 4bad2bbb86140a5f04ac6a704b84e0b314cad76b..d6d87bac84b307d388147cc3bd44c0346b5afa77 100644 (file)
@@ -351,7 +351,6 @@ public class BindingTestContext implements AutoCloseable, SchemaContextProvider
     private void startDomBroker() {
         checkState(executor != null);
         biBrokerImpl = new BrokerImpl();
-        biBrokerImpl.setExecutor(executor);
         biBrokerImpl.setRouter(new SchemaAwareRpcBroker("/", this));
 
     }
index c8eb7fd56f1f68654a14785e5a5f81121e47a692..4f11ba066110be3f73d855a5c4871195fc47306c 100644 (file)
@@ -1,3 +1,10 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
 package org.opendaylight.controller.sal.core.api;
 
 import org.opendaylight.yangtools.yang.common.QName;
@@ -5,8 +12,10 @@ import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 public interface RoutedRpcDefaultImplementation {
 
-  public RpcResult<CompositeNode> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input);
+    ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input);
 
 }
index 6b1030a81500f5a909d6df30f69e52208025c482..38b33d5d2a509f228a3fcc88a28c3b4f64a11dad 100644 (file)
@@ -15,6 +15,8 @@ import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 /**
  * {@link Provider}'s implementation of an RPC.
  *
@@ -42,8 +44,6 @@ import org.opendaylight.yangtools.yang.data.api.CompositeNode;
  * {@link RpcResult}
  * <li> {@link Broker} returns the {@link RpcResult} to {@link Consumer}
  * </ol>
- *
- *
  */
 public interface RpcImplementation extends Provider.ProviderFunctionality {
 
@@ -59,13 +59,12 @@ public interface RpcImplementation extends Provider.ProviderFunctionality {
     Set<QName> getSupportedRpcs();
 
     /**
-     * Invokes a implementation of specified rpc.
-     *
+     * Invokes a implementation of specified RPC asynchronously.
      *
      * @param rpc
-     *            Rpc to be invoked
+     *            RPC to be invoked
      * @param input
-     *            Input data for rpc.
+     *            Input data for the RPC.
      *
      * @throws IllegalArgumentException
      *             <ul>
@@ -73,9 +72,9 @@ public interface RpcImplementation extends Provider.ProviderFunctionality {
      *             <li>If input is not <code>null</code> and
      *             <code>false == rpc.equals(input.getNodeType)</code>
      *             </ul>
-     * @return RpcResult containing the output of rpc if was executed
-     *         successfully, the list of errors otherwise.
+     * @return Future promising an RpcResult containing the output of
+     *         the RPC if was executed successfully, the list of errors
+     *         otherwise.
      */
-    RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input);
-
+    ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input);
 }
index 64de8683d1d1a8a5c3854aa5f55e6b907852c448..0ed14c1027905f22fff667201d5a3294ed6afc56 100644 (file)
@@ -7,13 +7,11 @@
  */
 package org.opendaylight.controller.sal.dom.broker;
 
+import com.google.common.util.concurrent.ListenableFuture
 import java.util.Collections
 import java.util.HashSet
 import java.util.Set
-import java.util.concurrent.Callable
-import java.util.concurrent.ExecutorService
-import java.util.concurrent.Executors
-import java.util.concurrent.Future
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener
 import org.opendaylight.controller.sal.core.api.Broker
 import org.opendaylight.controller.sal.core.api.Consumer
 import org.opendaylight.controller.sal.core.api.Provider
@@ -26,7 +24,6 @@ import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter
 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener
 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry
 import org.opendaylight.controller.sal.core.api.RpcImplementation
-import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener
 import org.opendaylight.controller.sal.core.api.RpcRoutingContext
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
 import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation
@@ -39,12 +36,9 @@ public class BrokerImpl implements Broker, RpcProvisionRegistry, AutoCloseable {
     private val Set<ProviderContextImpl> providerSessions = Collections.synchronizedSet(
         new HashSet<ProviderContextImpl>());
 
-    // Implementation specific
-    @Property
-    private var ExecutorService executor = Executors.newFixedThreadPool(5);
     @Property
     private var BundleContext bundleContext;
-    
+
     @Property
     private var AutoCloseable deactivator;
 
@@ -69,9 +63,8 @@ public class BrokerImpl implements Broker, RpcProvisionRegistry, AutoCloseable {
         return session;
     }
 
-    protected def Future<RpcResult<CompositeNode>> invokeRpcAsync(QName rpc, CompositeNode input) {
-        val result = executor.submit([|router.invokeRpc(rpc, input)] as Callable<RpcResult<CompositeNode>>);
-        return result;
+    protected def ListenableFuture<RpcResult<CompositeNode>> invokeRpcAsync(QName rpc, CompositeNode input) {
+        return router.invokeRpc(rpc, input);
     }
 
     // Validation
@@ -111,15 +104,15 @@ public class BrokerImpl implements Broker, RpcProvisionRegistry, AutoCloseable {
         sessions.remove(consumerContextImpl);
         providerSessions.remove(consumerContextImpl);
     }
-    
+
     override close() throws Exception {
         deactivator?.close();
     }
-    
+
     override addRpcImplementation(QName rpcType, RpcImplementation implementation) throws IllegalArgumentException {
         router.addRpcImplementation(rpcType,implementation);
     }
-    
+
     override addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
         router.addRoutedRpcImplementation(rpcType,implementation);
     }
@@ -131,17 +124,17 @@ public class BrokerImpl implements Broker, RpcProvisionRegistry, AutoCloseable {
     override addRpcRegistrationListener(RpcRegistrationListener listener) {
         return router.addRpcRegistrationListener(listener);
     }
-    
+
     override <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> registerRouteChangeListener(L listener) {
         return router.registerRouteChangeListener(listener);
     }
 
-    override invokeRpc(QName rpc,CompositeNode input){
-        return router.invokeRpc(rpc,input)
-    }
-
     override getSupportedRpcs() {
         return router.getSupportedRpcs();
     }
-    
+
+    override invokeRpc(QName rpc, CompositeNode input) {
+        return router.invokeRpc(rpc,input)
+    }
+
 }
index f9f977e3c24d67abe809ce2f3648df5f63960d83..263f0500fd1c280b470171352716c70b723c18c7 100644 (file)
@@ -123,9 +123,8 @@ public class MountPointImpl implements MountProvisionInstance, SchemaContextProv
         return rpcs.getSupportedRpcs();
     }
 
-
     @Override
-    public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+    public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
         return rpcs.invokeRpc(rpc, input);
     }
 
@@ -134,7 +133,6 @@ public class MountPointImpl implements MountProvisionInstance, SchemaContextProv
         return rpcs.addRpcRegistrationListener(listener);
     }
 
-
     @Override
     public ListenableFuture<RpcResult<CompositeNode>> rpc(QName type, CompositeNode input) {
         return null;
@@ -228,6 +226,4 @@ public class MountPointImpl implements MountProvisionInstance, SchemaContextProv
             L listener) {
         return rpcs.registerRouteChangeListener(listener);
     }
-
-
 }
index 598361c3ae3cbf7e41eed5a69ea9cf0d74727702..3e7b115f11e28667114c798062bd4737cda9f333 100644 (file)
@@ -46,6 +46,7 @@ import com.google.common.base.Optional;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.ListenableFuture;
 
 public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, RoutedRpcDefaultImplementation {
 
@@ -85,16 +86,16 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, Ro
         this.schemaProvider = schemaProvider;
     }
 
-  public RoutedRpcDefaultImplementation getRoutedRpcDefaultDelegate() {
-    return defaultDelegate;
-  }
+    public RoutedRpcDefaultImplementation getRoutedRpcDefaultDelegate() {
+        return defaultDelegate;
+    }
 
     @Override
-  public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultDelegate) {
-    this.defaultDelegate = defaultDelegate;
-  }
+    public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultDelegate) {
+        this.defaultDelegate = defaultDelegate;
+    }
 
-  @Override
+    @Override
     public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
         checkArgument(rpcType != null, "RPC Type should not be null");
         checkArgument(implementation != null, "RPC Implementatoin should not be null");
@@ -163,7 +164,7 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, Ro
     }
 
     @Override
-    public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+    public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
         return findRpcImplemention(rpc).invokeRpc(rpc, input);
     }
 
@@ -235,7 +236,7 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, Ro
     }
 
     @Override
-    public RpcResult<CompositeNode> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
+    public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
       checkState(defaultDelegate != null);
       return defaultDelegate.invokeRpc(rpc, identifier, input);
     }
@@ -319,7 +320,7 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, Ro
         }
 
         @Override
-        public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+        public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
             CompositeNode inputContainer = input.getFirstCompositeByName(QName.create(rpc,"input"));
             checkArgument(inputContainer != null, "Rpc payload must contain input element");
             SimpleNode<?> routeContainer = inputContainer.getFirstSimpleByName(strategy.getLeaf());
index 40842c004a2779ede0d231b2fef122920bc4ebc9..6e44cba494eda0d29d333ba89492aea18cba55f6 100644 (file)
@@ -8,6 +8,8 @@
 
 package org.opendaylight.controller.sal.dom.broker.osgi;
 
+import java.util.Set;
+
 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
 import org.opendaylight.controller.sal.core.api.*;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
@@ -17,7 +19,7 @@ import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.osgi.framework.ServiceReference;
 
-import java.util.Set;
+import com.google.common.util.concurrent.ListenableFuture;
 
 public class RpcProvisionRegistryProxy extends AbstractBrokerServiceProxy<RpcProvisionRegistry>
                                        implements RpcProvisionRegistry {
@@ -41,24 +43,23 @@ public class RpcProvisionRegistryProxy extends AbstractBrokerServiceProxy<RpcPro
         return getDelegate().addRoutedRpcImplementation(rpcType, implementation);
     }
 
-  @Override
-  public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultImplementation) {
-    getDelegate().setRoutedRpcDefaultDelegate(defaultImplementation);
-  }
+    @Override
+    public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultImplementation) {
+        getDelegate().setRoutedRpcDefaultDelegate(defaultImplementation);
+    }
 
-  @Override
+    @Override
     public <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> ListenerRegistration<L> registerRouteChangeListener(L listener) {
         return getDelegate().registerRouteChangeListener(listener);
     }
 
+    @Override
+    public Set<QName> getSupportedRpcs() {
+        return getDelegate().getSupportedRpcs();
+    }
 
-  @Override
-  public Set<QName> getSupportedRpcs() {
-    return getDelegate().getSupportedRpcs();
-  }
-
-  @Override
-  public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
-    return getDelegate().invokeRpc(rpc,input);
-  }
+    @Override
+    public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
+        return getDelegate().invokeRpc(rpc, input);
+    }
 }
index 02419ff529ac7627edf04c00320f9771b05b0b28..2976c76ffa0a8ea91679baa975ecf2e77efc14cf 100644 (file)
@@ -8,27 +8,20 @@
 package org.opendaylight.controller.sal.dom.broker.spi;
 
 import java.util.Map;
-import java.util.Set;
 
 import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
 import org.opendaylight.controller.sal.core.api.RpcImplementation;
 import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 
 public interface RoutedRpcProcessor extends RpcImplementation {
 
-    public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation);
+    RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation);
 
-    public Set<QName> getSupportedRpcs();
-
-    public QName getRpcType();
-    
-    public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input);
+    QName getRpcType();
 
     Map<InstanceIdentifier,RpcImplementation> getRoutes();
-    
+
     RpcImplementation getDefaultRoute();
 
 }
index d1523a01d695f2033ee9bfa557b1aaf0ea4c592d..b19dac5535bbf3facbd72df32470bc719bb00ff8 100644 (file)
@@ -14,8 +14,6 @@ import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
 import org.opendaylight.controller.sal.core.api.RpcImplementation;
 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
 import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 
 public interface RpcRouter extends RpcProvisionRegistry, RpcImplementation {
 
@@ -28,7 +26,4 @@ public interface RpcRouter extends RpcProvisionRegistry, RpcImplementation {
 
     @Override
     public Set<QName> getSupportedRpcs();
-
-    @Override
-    public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input);
 }