Change RpcImplementation contract to asynchronous 85/5485/9
authorRobert Varga <rovarga@cisco.com>
Thu, 27 Feb 2014 08:02:42 +0000 (09:02 +0100)
committerRobert Varga <rovarga@cisco.com>
Mon, 14 Apr 2014 07:47:44 +0000 (07:47 +0000)
This changes the method used to invoke RPCs such that they need not be
synchronous anymore, but rather return a ListenableFuture. If an
implementation is synchronous, it should use Futures.immediateFuture()
as a wrapper.

Change-Id: I0623d2afda038ba49afa83ed6020910b74b4911e
Signed-off-by: Robert Varga <rovarga@cisco.com>
17 files changed:
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingIndependentConnector.java
opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/util/BindingTestContext.java
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/connect/dom/CrossBrokerRpcTest.java
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RoutedRpcDefaultImplementation.java
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RpcImplementation.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/BrokerImpl.xtend
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/MountPointImpl.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/SchemaAwareRpcBroker.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/osgi/RpcProvisionRegistryProxy.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/spi/RoutedRpcProcessor.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/spi/RpcRouter.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceTwoPhaseCommitTransaction.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfRemoteSchemaSourceProvider.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImpl.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcProvider.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImplTest.java

index 9edea0c..93849c2 100644 (file)
@@ -91,6 +91,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSet.Builder;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 
 public class BindingIndependentConnector implements //
         RuntimeDataProvider, //
@@ -699,8 +700,9 @@ public class BindingIndependentConnector implements //
             }
         }
 
+
         @Override
-        public RpcResult<CompositeNode> invokeRpc(final QName rpc, final CompositeNode domInput) {
+        public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(final QName rpc, final CompositeNode domInput) {
             checkArgument(rpc != null);
             checkArgument(domInput != null);
 
@@ -709,10 +711,11 @@ public class BindingIndependentConnector implements //
             RpcService rpcService = baRpcRegistry.getRpcService(rpcType);
             checkState(rpcService != null);
             CompositeNode domUnwrappedInput = domInput.getFirstCompositeByName(QName.create(rpc, "input"));
+
             try {
-                return resolveInvocationStrategy(rpc).invokeOn(rpcService, domUnwrappedInput);
+                return Futures.immediateFuture(resolveInvocationStrategy(rpc).invokeOn(rpcService, domUnwrappedInput));
             } catch (Exception e) {
-                throw new IllegalStateException(e);
+                return Futures.immediateFailedFuture(e);
             }
         }
 
@@ -813,21 +816,25 @@ public class BindingIndependentConnector implements //
         }
 
         @Override
-        public Future<RpcResult<?>> forwardToDomBroker(final DataObject input) {
-            if(biRpcRegistry != null) {
-                CompositeNode xml = mappingService.toDataDom(input);
-                CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc, ImmutableList.<Node<?>> of(xml));
-                RpcResult<CompositeNode> result = biRpcRegistry.invokeRpc(rpc, wrappedXml);
-                Object baResultValue = null;
-                if (result.getResult() != null) {
-                    baResultValue = mappingService.dataObjectFromDataDom(outputClass.get(), result.getResult());
-                }
-                RpcResult<?> baResult = Rpcs.getRpcResult(result.isSuccessful(), baResultValue, result.getErrors());
-                return Futures.<RpcResult<?>> immediateFuture(baResult);
+        public ListenableFuture<RpcResult<?>> forwardToDomBroker(final DataObject input) {
+            if(biRpcRegistry == null) {
+                return Futures.<RpcResult<?>> immediateFuture(Rpcs.getRpcResult(false));
             }
-            return Futures.<RpcResult<?>> immediateFuture(Rpcs.getRpcResult(false));
-        }
 
+            CompositeNode xml = mappingService.toDataDom(input);
+            CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc, ImmutableList.<Node<?>> of(xml));
+
+            return Futures.transform(biRpcRegistry.invokeRpc(rpc, wrappedXml), new Function<RpcResult<CompositeNode>, RpcResult<?>>() {
+                @Override
+                public RpcResult<?> apply(RpcResult<CompositeNode> input) {
+                    Object baResultValue = null;
+                    if (input.getResult() != null) {
+                        baResultValue = mappingService.dataObjectFromDataDom(outputClass.get(), input.getResult());
+                    }
+                    return Rpcs.getRpcResult(input.isSuccessful(), baResultValue, input.getErrors());
+                }
+            });
+        }
     }
 
     private class NoInputNoOutputInvocationStrategy extends RpcInvocationStrategy {
@@ -876,18 +883,21 @@ public class BindingIndependentConnector implements //
         }
 
         @Override
-        public Future<RpcResult<?>> forwardToDomBroker(final DataObject input) {
-            if(biRpcRegistry != null) {
-                CompositeNode xml = mappingService.toDataDom(input);
-                CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc,ImmutableList.<Node<?>>of(xml));
-                RpcResult<CompositeNode> result = biRpcRegistry.invokeRpc(rpc, wrappedXml);
-                Object baResultValue = null;
-                RpcResult<?> baResult = Rpcs.<Void>getRpcResult(result.isSuccessful(), null, result.getErrors());
-                return Futures.<RpcResult<?>>immediateFuture(baResult);
+        public ListenableFuture<RpcResult<?>> forwardToDomBroker(final DataObject input) {
+            if(biRpcRegistry == null) {
+                return Futures.<RpcResult<?>> immediateFuture(Rpcs.getRpcResult(false));
             }
-            return Futures.<RpcResult<?>>immediateFuture(Rpcs.getRpcResult(false));
-        }
 
+            CompositeNode xml = mappingService.toDataDom(input);
+            CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc,ImmutableList.<Node<?>>of(xml));
+
+            return Futures.transform(biRpcRegistry.invokeRpc(rpc, wrappedXml), new Function<RpcResult<CompositeNode>, RpcResult<?>>() {
+                @Override
+                public RpcResult<?> apply(RpcResult<CompositeNode> input) {
+                    return Rpcs.<Void>getRpcResult(input.isSuccessful(), null, input.getErrors());
+                }
+            });
+        }
     }
 
     public boolean isRpcForwarding() {
index 4bad2bb..d6d87ba 100644 (file)
@@ -351,7 +351,6 @@ public class BindingTestContext implements AutoCloseable, SchemaContextProvider
     private void startDomBroker() {
         checkState(executor != null);
         biBrokerImpl = new BrokerImpl();
-        biBrokerImpl.setExecutor(executor);
         biBrokerImpl.setRouter(new SchemaAwareRpcBroker("/", this));
 
     }
index b1547b6..ca38ed0 100644 (file)
@@ -50,6 +50,7 @@ import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 
 public class CrossBrokerRpcTest {
@@ -64,7 +65,7 @@ public class CrossBrokerRpcTest {
     public static final NodeId NODE_B = new NodeId("b");
     public static final NodeId NODE_C = new NodeId("c");
     public static final NodeId NODE_D = new NodeId("d");
-    
+
     private static final QName NODE_ID_QNAME = QName.create(Node.QNAME, "id");
     private static final QName ADD_FLOW_QNAME = QName.create(NodeFlowRemoved.QNAME, "add-flow");
 
@@ -78,7 +79,7 @@ public class CrossBrokerRpcTest {
     public static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier BI_NODE_C_ID = createBINodeIdentifier(NODE_C);
     public static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier BI_NODE_D_ID = createBINodeIdentifier(NODE_D);
 
-    
+
 
     @Before
     public void setup() {
@@ -99,7 +100,7 @@ public class CrossBrokerRpcTest {
     }
 
     @Test
-    public void bindingRoutedRpcProvider_DomInvokerTest() {
+    public void bindingRoutedRpcProvider_DomInvokerTest() throws Exception {
 
         flowService//
                 .registerPath(NodeContext.class, BA_NODE_A_ID) //
@@ -114,7 +115,7 @@ public class CrossBrokerRpcTest {
 
         CompositeNode addFlowDom = toDomRpc(ADD_FLOW_QNAME, addFlowA);
         assertNotNull(addFlowDom);
-        RpcResult<CompositeNode> domResult = biRpcInvoker.invokeRpc(ADD_FLOW_QNAME, addFlowDom);
+        RpcResult<CompositeNode> domResult = biRpcInvoker.invokeRpc(ADD_FLOW_QNAME, addFlowDom).get();
         assertNotNull(domResult);
         assertTrue("DOM result is successful.", domResult.isSuccessful());
         assertTrue("Bidning Add Flow RPC was captured.", flowService.getReceivedAddFlows().containsKey(BA_NODE_A_ID));
@@ -128,18 +129,18 @@ public class CrossBrokerRpcTest {
         final AddFlowOutput output = builder.build();
         org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration registration = biRpcRegistry.addRoutedRpcImplementation(ADD_FLOW_QNAME, new RpcImplementation() {
             @Override
-            public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
-                CompositeNode result = testContext.getBindingToDomMappingService().toDataDom(output);
-                return Rpcs.getRpcResult(true, result, ImmutableList.<RpcError>of());
+            public Set<QName> getSupportedRpcs() {
+                return ImmutableSet.of(ADD_FLOW_QNAME);
             }
 
             @Override
-            public Set<QName> getSupportedRpcs() {
-                return ImmutableSet.of(ADD_FLOW_QNAME);
+            public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
+                CompositeNode result = testContext.getBindingToDomMappingService().toDataDom(output);
+                return Futures.immediateFuture(Rpcs.getRpcResult(true, result, ImmutableList.<RpcError>of()));
             }
         });
         registration.registerPath(NodeContext.QNAME, BI_NODE_C_ID);
-        
+
         SalFlowService baFlowInvoker = baRpcRegistry.getRpcService(SalFlowService.class);
         Future<RpcResult<AddFlowOutput>> baResult = baFlowInvoker.addFlow(addFlow(BA_NODE_C_ID).setPriority(500).build());
         assertNotNull(baResult);
index c8eb7fd..4f11ba0 100644 (file)
@@ -1,3 +1,10 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
 package org.opendaylight.controller.sal.core.api;
 
 import org.opendaylight.yangtools.yang.common.QName;
@@ -5,8 +12,10 @@ import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 public interface RoutedRpcDefaultImplementation {
 
-  public RpcResult<CompositeNode> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input);
+    ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input);
 
 }
index 6b1030a..38b33d5 100644 (file)
@@ -15,6 +15,8 @@ import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 /**
  * {@link Provider}'s implementation of an RPC.
  *
@@ -42,8 +44,6 @@ import org.opendaylight.yangtools.yang.data.api.CompositeNode;
  * {@link RpcResult}
  * <li> {@link Broker} returns the {@link RpcResult} to {@link Consumer}
  * </ol>
- *
- *
  */
 public interface RpcImplementation extends Provider.ProviderFunctionality {
 
@@ -59,13 +59,12 @@ public interface RpcImplementation extends Provider.ProviderFunctionality {
     Set<QName> getSupportedRpcs();
 
     /**
-     * Invokes a implementation of specified rpc.
-     *
+     * Invokes a implementation of specified RPC asynchronously.
      *
      * @param rpc
-     *            Rpc to be invoked
+     *            RPC to be invoked
      * @param input
-     *            Input data for rpc.
+     *            Input data for the RPC.
      *
      * @throws IllegalArgumentException
      *             <ul>
@@ -73,9 +72,9 @@ public interface RpcImplementation extends Provider.ProviderFunctionality {
      *             <li>If input is not <code>null</code> and
      *             <code>false == rpc.equals(input.getNodeType)</code>
      *             </ul>
-     * @return RpcResult containing the output of rpc if was executed
-     *         successfully, the list of errors otherwise.
+     * @return Future promising an RpcResult containing the output of
+     *         the RPC if was executed successfully, the list of errors
+     *         otherwise.
      */
-    RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input);
-
+    ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input);
 }
index 64de868..0ed14c1 100644 (file)
@@ -7,13 +7,11 @@
  */
 package org.opendaylight.controller.sal.dom.broker;
 
+import com.google.common.util.concurrent.ListenableFuture
 import java.util.Collections
 import java.util.HashSet
 import java.util.Set
-import java.util.concurrent.Callable
-import java.util.concurrent.ExecutorService
-import java.util.concurrent.Executors
-import java.util.concurrent.Future
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener
 import org.opendaylight.controller.sal.core.api.Broker
 import org.opendaylight.controller.sal.core.api.Consumer
 import org.opendaylight.controller.sal.core.api.Provider
@@ -26,7 +24,6 @@ import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter
 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener
 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry
 import org.opendaylight.controller.sal.core.api.RpcImplementation
-import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener
 import org.opendaylight.controller.sal.core.api.RpcRoutingContext
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
 import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation
@@ -39,12 +36,9 @@ public class BrokerImpl implements Broker, RpcProvisionRegistry, AutoCloseable {
     private val Set<ProviderContextImpl> providerSessions = Collections.synchronizedSet(
         new HashSet<ProviderContextImpl>());
 
-    // Implementation specific
-    @Property
-    private var ExecutorService executor = Executors.newFixedThreadPool(5);
     @Property
     private var BundleContext bundleContext;
-    
+
     @Property
     private var AutoCloseable deactivator;
 
@@ -69,9 +63,8 @@ public class BrokerImpl implements Broker, RpcProvisionRegistry, AutoCloseable {
         return session;
     }
 
-    protected def Future<RpcResult<CompositeNode>> invokeRpcAsync(QName rpc, CompositeNode input) {
-        val result = executor.submit([|router.invokeRpc(rpc, input)] as Callable<RpcResult<CompositeNode>>);
-        return result;
+    protected def ListenableFuture<RpcResult<CompositeNode>> invokeRpcAsync(QName rpc, CompositeNode input) {
+        return router.invokeRpc(rpc, input);
     }
 
     // Validation
@@ -111,15 +104,15 @@ public class BrokerImpl implements Broker, RpcProvisionRegistry, AutoCloseable {
         sessions.remove(consumerContextImpl);
         providerSessions.remove(consumerContextImpl);
     }
-    
+
     override close() throws Exception {
         deactivator?.close();
     }
-    
+
     override addRpcImplementation(QName rpcType, RpcImplementation implementation) throws IllegalArgumentException {
         router.addRpcImplementation(rpcType,implementation);
     }
-    
+
     override addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
         router.addRoutedRpcImplementation(rpcType,implementation);
     }
@@ -131,17 +124,17 @@ public class BrokerImpl implements Broker, RpcProvisionRegistry, AutoCloseable {
     override addRpcRegistrationListener(RpcRegistrationListener listener) {
         return router.addRpcRegistrationListener(listener);
     }
-    
+
     override <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> registerRouteChangeListener(L listener) {
         return router.registerRouteChangeListener(listener);
     }
 
-    override invokeRpc(QName rpc,CompositeNode input){
-        return router.invokeRpc(rpc,input)
-    }
-
     override getSupportedRpcs() {
         return router.getSupportedRpcs();
     }
-    
+
+    override invokeRpc(QName rpc, CompositeNode input) {
+        return router.invokeRpc(rpc,input)
+    }
+
 }
index f9f977e..263f050 100644 (file)
@@ -123,9 +123,8 @@ public class MountPointImpl implements MountProvisionInstance, SchemaContextProv
         return rpcs.getSupportedRpcs();
     }
 
-
     @Override
-    public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+    public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
         return rpcs.invokeRpc(rpc, input);
     }
 
@@ -134,7 +133,6 @@ public class MountPointImpl implements MountProvisionInstance, SchemaContextProv
         return rpcs.addRpcRegistrationListener(listener);
     }
 
-
     @Override
     public ListenableFuture<RpcResult<CompositeNode>> rpc(QName type, CompositeNode input) {
         return null;
@@ -228,6 +226,4 @@ public class MountPointImpl implements MountProvisionInstance, SchemaContextProv
             L listener) {
         return rpcs.registerRouteChangeListener(listener);
     }
-
-
 }
index 598361c..3e7b115 100644 (file)
@@ -46,6 +46,7 @@ import com.google.common.base.Optional;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.ListenableFuture;
 
 public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, RoutedRpcDefaultImplementation {
 
@@ -85,16 +86,16 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, Ro
         this.schemaProvider = schemaProvider;
     }
 
-  public RoutedRpcDefaultImplementation getRoutedRpcDefaultDelegate() {
-    return defaultDelegate;
-  }
+    public RoutedRpcDefaultImplementation getRoutedRpcDefaultDelegate() {
+        return defaultDelegate;
+    }
 
     @Override
-  public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultDelegate) {
-    this.defaultDelegate = defaultDelegate;
-  }
+    public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultDelegate) {
+        this.defaultDelegate = defaultDelegate;
+    }
 
-  @Override
+    @Override
     public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
         checkArgument(rpcType != null, "RPC Type should not be null");
         checkArgument(implementation != null, "RPC Implementatoin should not be null");
@@ -163,7 +164,7 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, Ro
     }
 
     @Override
-    public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+    public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
         return findRpcImplemention(rpc).invokeRpc(rpc, input);
     }
 
@@ -235,7 +236,7 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, Ro
     }
 
     @Override
-    public RpcResult<CompositeNode> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
+    public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
       checkState(defaultDelegate != null);
       return defaultDelegate.invokeRpc(rpc, identifier, input);
     }
@@ -319,7 +320,7 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, Ro
         }
 
         @Override
-        public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+        public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
             CompositeNode inputContainer = input.getFirstCompositeByName(QName.create(rpc,"input"));
             checkArgument(inputContainer != null, "Rpc payload must contain input element");
             SimpleNode<?> routeContainer = inputContainer.getFirstSimpleByName(strategy.getLeaf());
index 40842c0..6e44cba 100644 (file)
@@ -8,6 +8,8 @@
 
 package org.opendaylight.controller.sal.dom.broker.osgi;
 
+import java.util.Set;
+
 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
 import org.opendaylight.controller.sal.core.api.*;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
@@ -17,7 +19,7 @@ import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.osgi.framework.ServiceReference;
 
-import java.util.Set;
+import com.google.common.util.concurrent.ListenableFuture;
 
 public class RpcProvisionRegistryProxy extends AbstractBrokerServiceProxy<RpcProvisionRegistry>
                                        implements RpcProvisionRegistry {
@@ -41,24 +43,23 @@ public class RpcProvisionRegistryProxy extends AbstractBrokerServiceProxy<RpcPro
         return getDelegate().addRoutedRpcImplementation(rpcType, implementation);
     }
 
-  @Override
-  public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultImplementation) {
-    getDelegate().setRoutedRpcDefaultDelegate(defaultImplementation);
-  }
+    @Override
+    public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultImplementation) {
+        getDelegate().setRoutedRpcDefaultDelegate(defaultImplementation);
+    }
 
-  @Override
+    @Override
     public <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> ListenerRegistration<L> registerRouteChangeListener(L listener) {
         return getDelegate().registerRouteChangeListener(listener);
     }
 
+    @Override
+    public Set<QName> getSupportedRpcs() {
+        return getDelegate().getSupportedRpcs();
+    }
 
-  @Override
-  public Set<QName> getSupportedRpcs() {
-    return getDelegate().getSupportedRpcs();
-  }
-
-  @Override
-  public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
-    return getDelegate().invokeRpc(rpc,input);
-  }
+    @Override
+    public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
+        return getDelegate().invokeRpc(rpc, input);
+    }
 }
index 02419ff..2976c76 100644 (file)
@@ -8,27 +8,20 @@
 package org.opendaylight.controller.sal.dom.broker.spi;
 
 import java.util.Map;
-import java.util.Set;
 
 import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
 import org.opendaylight.controller.sal.core.api.RpcImplementation;
 import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 
 public interface RoutedRpcProcessor extends RpcImplementation {
 
-    public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation);
+    RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation);
 
-    public Set<QName> getSupportedRpcs();
-
-    public QName getRpcType();
-    
-    public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input);
+    QName getRpcType();
 
     Map<InstanceIdentifier,RpcImplementation> getRoutes();
-    
+
     RpcImplementation getDefaultRoute();
 
 }
index d1523a0..b19dac5 100644 (file)
@@ -14,8 +14,6 @@ import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
 import org.opendaylight.controller.sal.core.api.RpcImplementation;
 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
 import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 
 public interface RpcRouter extends RpcProvisionRegistry, RpcImplementation {
 
@@ -28,7 +26,4 @@ public interface RpcRouter extends RpcProvisionRegistry, RpcImplementation {
 
     @Override
     public Set<QName> getSupportedRpcs();
-
-    @Override
-    public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input);
 }
index 4ae84c7..3eb0472 100644 (file)
@@ -55,6 +55,7 @@ import static com.google.common.base.Preconditions.*
 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.*
 
 import static extension org.opendaylight.controller.sal.connect.netconf.NetconfMapping.*
+import com.google.common.util.concurrent.Futures
 
 class NetconfDevice implements Provider, // 
 DataReader<InstanceIdentifier, CompositeNode>, //
@@ -203,13 +204,13 @@ AutoCloseable {
 
     override readConfigurationData(InstanceIdentifier path) {
         val result = invokeRpc(NETCONF_GET_CONFIG_QNAME,
-            wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure()));
+            wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure())).get();
         val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
         return data?.findNode(path) as CompositeNode;
     }
 
     override readOperationalData(InstanceIdentifier path) {
-        val result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, path.toFilterStructure()));
+        val result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, path.toFilterStructure())).get();
         val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
         return data?.findNode(path) as CompositeNode;
     }
@@ -218,19 +219,18 @@ AutoCloseable {
         Collections.emptySet;
     }
 
-    def createSubscription(String streamName) {
-        val it = ImmutableCompositeNode.builder()
-        QName = NETCONF_CREATE_SUBSCRIPTION_QNAME
-        addLeaf("stream", streamName);
-        invokeRpc(QName, toInstance())
-    }
+//    def createSubscription(String streamName) {
+//        val it = ImmutableCompositeNode.builder()
+//        QName = NETCONF_CREATE_SUBSCRIPTION_QNAME
+//        addLeaf("stream", streamName);
+//        invokeRpc(QName, toInstance())
+//    }
 
     override invokeRpc(QName rpc, CompositeNode input) {
         try {
             val message = rpc.toRpcMessage(input,schemaContext);
             val result = sendMessageImpl(message, messegeRetryCount, messageTimeoutCount);
-            return result.toRpcResult(rpc, schemaContext);
-
+            return Futures.immediateFuture(result.toRpcResult(rpc, schemaContext));
         } catch (Exception e) {
             logger.error("Rpc was not processed correctly.", e)
             throw e;
@@ -342,7 +342,6 @@ AutoCloseable {
         operReaderReg?.close()
         client?.close()
     }
-
 }
 
 package class NetconfDeviceSchemaContextProvider {
index c5390e5..9ec3aa3 100644 (file)
@@ -15,14 +15,17 @@ import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NET
 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_RUNNING_QNAME;
 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_TARGET_QNAME;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
 
 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
 import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
@@ -31,6 +34,8 @@ import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.Node;
 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
 import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
@@ -38,7 +43,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
 public class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransaction<InstanceIdentifier, CompositeNode> {
-
+    private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceTwoPhaseCommitTransaction.class);
     private final NetconfDevice device;
     private final DataModification<InstanceIdentifier, CompositeNode> modification;
     private final boolean candidateSupported = true;
@@ -50,7 +55,7 @@ public class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransac
         this.modification = modification;
     }
 
-    public void prepare() {
+    void prepare() throws InterruptedException, ExecutionException {
         for (InstanceIdentifier toRemove : modification.getRemovedConfigurationData()) {
             sendDelete(toRemove);
         }
@@ -60,20 +65,20 @@ public class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransac
 
     }
 
-    private void sendMerge(InstanceIdentifier key, CompositeNode value) {
+    private void sendMerge(InstanceIdentifier key, CompositeNode value) throws InterruptedException, ExecutionException {
         sendEditRpc(createEditStructure(key, Optional.<String>absent(), Optional.of(value)));
     }
 
-    private void sendDelete(InstanceIdentifier toDelete) {
+    private void sendDelete(InstanceIdentifier toDelete) throws InterruptedException, ExecutionException {
         sendEditRpc(createEditStructure(toDelete, Optional.of("delete"), Optional.<CompositeNode> absent()));
     }
 
-    private void sendEditRpc(CompositeNode editStructure) {
+    private void sendEditRpc(CompositeNode editStructure) throws InterruptedException, ExecutionException {
         CompositeNodeBuilder<ImmutableCompositeNode> builder = configurationRpcBuilder();
         builder.setQName(NETCONF_EDIT_CONFIG_QNAME);
         builder.add(editStructure);
 
-        RpcResult<CompositeNode> rpcResult = device.invokeRpc(NETCONF_EDIT_CONFIG_QNAME, builder.toInstance());
+        RpcResult<CompositeNode> rpcResult = device.invokeRpc(NETCONF_EDIT_CONFIG_QNAME, builder.toInstance()).get();
         Preconditions.checkState(rpcResult.isSuccessful(),"Rpc Result was unsuccessful");
 
     }
@@ -135,8 +140,45 @@ public class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransac
     public RpcResult<Void> finish() {
         CompositeNodeBuilder<ImmutableCompositeNode> commitInput = ImmutableCompositeNode.builder();
         commitInput.setQName(NETCONF_COMMIT_QNAME);
-        RpcResult<?> rpcResult = device.invokeRpc(NetconfMapping.NETCONF_COMMIT_QNAME, commitInput.toInstance());
-        return (RpcResult<Void>) rpcResult;
+        try {
+            final RpcResult<?> rpcResult = device.invokeRpc(NetconfMapping.NETCONF_COMMIT_QNAME, commitInput.toInstance()).get();
+            return new RpcResult<Void>() {
+
+                @Override
+                public boolean isSuccessful() {
+                    return rpcResult.isSuccessful();
+                }
+
+                @Override
+                public Void getResult() {
+                    return null;
+                }
+
+                @Override
+                public Collection<RpcError> getErrors() {
+                    return rpcResult.getErrors();
+                }
+            };
+        } catch (final InterruptedException | ExecutionException e) {
+            LOG.warn("Failed to finish operation", e);
+            return new RpcResult<Void>() {
+                @Override
+                public boolean isSuccessful() {
+                    return false;
+                }
+
+                @Override
+                public Void getResult() {
+                    return null;
+                }
+
+                @Override
+                public Collection<RpcError> getErrors() {
+                    // FIXME: wrap the exception
+                    return Collections.emptySet();
+                }
+            };
+        }
     }
 
     @Override
index fa6b6f7..1932726 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.sal.connect.netconf;
 
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcResult;
@@ -44,15 +45,19 @@ class NetconfRemoteSchemaSourceProvider implements SchemaSourceProvider<String>
         }
 
         device.logger.trace("Loading YANG schema source for {}:{}", moduleName, revision);
-        RpcResult<CompositeNode> schemaReply = device.invokeRpc(GET_SCHEMA_QNAME, request.toInstance());
-        if (schemaReply.isSuccessful()) {
-            String schemaBody = getSchemaFromRpc(schemaReply.getResult());
-            if (schemaBody != null) {
-                device.logger.trace("YANG Schema successfully retrieved from remote for {}:{}", moduleName, revision);
-                return Optional.of(schemaBody);
+        try {
+            RpcResult<CompositeNode> schemaReply = device.invokeRpc(GET_SCHEMA_QNAME, request.toInstance()).get();
+            if (schemaReply.isSuccessful()) {
+                String schemaBody = getSchemaFromRpc(schemaReply.getResult());
+                if (schemaBody != null) {
+                    device.logger.trace("YANG Schema successfully retrieved from remote for {}:{}", moduleName, revision);
+                    return Optional.of(schemaBody);
+                }
             }
+            device.logger.warn("YANG shcema was not successfully retrieved.");
+        } catch (InterruptedException | ExecutionException e) {
+            device.logger.warn("YANG shcema was not successfully retrieved.", e);
         }
-        device.logger.warn("YANG shcema was not successfully retrieved.");
         return Optional.absent();
     }
 
index 84df2e4..8f95e73 100644 (file)
@@ -7,7 +7,16 @@
 
 package org.opendaylight.controller.sal.connector.remoterpc;
 
-import com.google.common.base.Optional;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+
 import org.opendaylight.controller.sal.common.util.RpcErrors;
 import org.opendaylight.controller.sal.common.util.Rpcs;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
@@ -27,16 +36,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.zeromq.ZMQ;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 
 /**
  * An implementation of {@link RpcImplementation} that makes
@@ -46,8 +48,8 @@ public class ClientImpl implements RemoteRpcClient {
 
   private final Logger _logger = LoggerFactory.getLogger(ClientImpl.class);
 
-  private ZMQ.Context context = ZMQ.context(1);
-  private ClientRequestHandler handler;
+  private final ZMQ.Context context = ZMQ.context(1);
+  private final ClientRequestHandler handler;
   private RoutingTableProvider routingTableProvider;
 
   public ClientImpl(){
@@ -64,6 +66,7 @@ public class ClientImpl implements RemoteRpcClient {
     return routingTableProvider;
   }
 
+  @Override
   public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
     this.routingTableProvider = routingTableProvider;
   }
@@ -93,7 +96,7 @@ public class ClientImpl implements RemoteRpcClient {
    * @param input payload for the remote service
    * @return
    */
-  public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+  public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
     RouteIdentifierImpl routeId = new RouteIdentifierImpl();
     routeId.setType(rpc);
 
@@ -115,7 +118,7 @@ public class ClientImpl implements RemoteRpcClient {
    *          payload
    * @return
    */
-  public RpcResult<CompositeNode> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
+  public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
 
     RouteIdentifierImpl routeId = new RouteIdentifierImpl();
     routeId.setType(rpc);
@@ -126,7 +129,7 @@ public class ClientImpl implements RemoteRpcClient {
     return sendMessage(input, routeId, address);
   }
 
-  private RpcResult<CompositeNode> sendMessage(CompositeNode input, RouteIdentifierImpl routeId, String address) {
+  private ListenableFuture<RpcResult<CompositeNode>> sendMessage(CompositeNode input, RouteIdentifierImpl routeId, String address) {
     Message request = new Message.MessageBuilder()
         .type(Message.MessageType.REQUEST)
         .sender(Context.getInstance().getLocalUri())
@@ -164,11 +167,11 @@ public class ClientImpl implements RemoteRpcClient {
 
         }
       }
-      return Rpcs.getRpcResult(true, payload, errors);
+      return Futures.immediateFuture(Rpcs.getRpcResult(true, payload, errors));
 
     } catch (Exception e){
       collectErrors(e, errors);
-      return Rpcs.getRpcResult(false, null, errors);
+      return Futures.immediateFuture(Rpcs.<CompositeNode>getRpcResult(false, null, errors));
     }
   }
 
index 16e7200..edcef83 100644 (file)
@@ -8,7 +8,14 @@
 
 package org.opendaylight.controller.sal.connector.remoterpc;
 
-import com.google.common.base.Optional;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
@@ -32,13 +39,8 @@ import org.osgi.util.tracker.ServiceTracker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ListenableFuture;
 
 public class RemoteRpcProvider implements
     RpcImplementation,
@@ -46,7 +48,7 @@ public class RemoteRpcProvider implements
     AutoCloseable,
     Provider {
 
-  private Logger _logger = LoggerFactory.getLogger(RemoteRpcProvider.class);
+  private final Logger _logger = LoggerFactory.getLogger(RemoteRpcProvider.class);
 
   private final ServerImpl server;
   private final ClientImpl client;
@@ -96,12 +98,12 @@ public class RemoteRpcProvider implements
   }
 
   @Override
-  public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+  public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
     return client.invokeRpc(rpc, input);
   }
 
   @Override
-  public RpcResult<CompositeNode> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
+  public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
     return client.invokeRpc(rpc, identifier, input);
   }
 
@@ -289,8 +291,5 @@ public class RemoteRpcProvider implements
       return routeIdSet;
     }
 
-
-
   }
-
 }
index c0aae2d..0fa12e3 100644 (file)
@@ -7,29 +7,25 @@
  */
 package org.opendaylight.controller.sal.connector.remoterpc;
 
-import com.google.common.base.Optional;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+
 import junit.framework.Assert;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.opendaylight.controller.sal.common.util.Rpcs;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener;
 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
-import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
-import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
 import org.opendaylight.controller.sal.connector.remoterpc.utils.MessagingUtil;
-import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 
-import java.io.IOException;
-import java.util.*;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import com.google.common.base.Optional;
 
 /**
  *
@@ -88,7 +84,7 @@ public class ClientImplTest {
     when(mockHandler.handle(any(Message.class))).
             thenReturn(MessagingUtil.createEmptyMessage());
 
-    RpcResult<CompositeNode> result = client.invokeRpc(null, null);
+    RpcResult<CompositeNode> result = client.invokeRpc(null, null).get();
 
     Assert.assertTrue(result.isSuccessful());
     Assert.assertTrue(result.getErrors().isEmpty());
@@ -101,7 +97,7 @@ public class ClientImplTest {
     when(mockHandler.handle(any(Message.class))).
             thenThrow(new IOException());
 
-    RpcResult<CompositeNode> result = client.invokeRpc(null, null);
+    RpcResult<CompositeNode> result = client.invokeRpc(null, null).get();
 
     Assert.assertFalse(result.isSuccessful());
     Assert.assertFalse(result.getErrors().isEmpty());

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.