From: Robert Varga Date: Thu, 27 Feb 2014 08:02:42 +0000 (+0100) Subject: Change RpcImplementation contract to asynchronous X-Git-Tag: autorelease-tag-v20140601202136_82eb3f9~235^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=bcdc6138d215d097b13510e08735808ed931aeda Change RpcImplementation contract to asynchronous This changes the method used to invoke RPCs such that they need not be synchronous anymore, but rather return a ListenableFuture. If an implementation is synchronous, it should use Futures.immediateFuture() as a wrapper. Change-Id: I0623d2afda038ba49afa83ed6020910b74b4911e Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingIndependentConnector.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingIndependentConnector.java index 9edea0c2fd..93849c2e91 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingIndependentConnector.java +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingIndependentConnector.java @@ -91,6 +91,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet.Builder; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; public class BindingIndependentConnector implements // RuntimeDataProvider, // @@ -699,8 +700,9 @@ public class BindingIndependentConnector implements // } } + @Override - public RpcResult invokeRpc(final QName rpc, final CompositeNode domInput) { + public ListenableFuture> invokeRpc(final QName rpc, final CompositeNode domInput) { checkArgument(rpc != null); checkArgument(domInput != null); @@ -709,10 +711,11 @@ public class BindingIndependentConnector implements // RpcService rpcService = baRpcRegistry.getRpcService(rpcType); checkState(rpcService != null); CompositeNode domUnwrappedInput = domInput.getFirstCompositeByName(QName.create(rpc, "input")); + try { - return resolveInvocationStrategy(rpc).invokeOn(rpcService, domUnwrappedInput); + return Futures.immediateFuture(resolveInvocationStrategy(rpc).invokeOn(rpcService, domUnwrappedInput)); } catch (Exception e) { - throw new IllegalStateException(e); + return Futures.immediateFailedFuture(e); } } @@ -813,21 +816,25 @@ public class BindingIndependentConnector implements // } @Override - public Future> forwardToDomBroker(final DataObject input) { - if(biRpcRegistry != null) { - CompositeNode xml = mappingService.toDataDom(input); - CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc, ImmutableList.> of(xml)); - RpcResult result = biRpcRegistry.invokeRpc(rpc, wrappedXml); - Object baResultValue = null; - if (result.getResult() != null) { - baResultValue = mappingService.dataObjectFromDataDom(outputClass.get(), result.getResult()); - } - RpcResult baResult = Rpcs.getRpcResult(result.isSuccessful(), baResultValue, result.getErrors()); - return Futures.> immediateFuture(baResult); + public ListenableFuture> forwardToDomBroker(final DataObject input) { + if(biRpcRegistry == null) { + return Futures.> immediateFuture(Rpcs.getRpcResult(false)); } - return Futures.> immediateFuture(Rpcs.getRpcResult(false)); - } + CompositeNode xml = mappingService.toDataDom(input); + CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc, ImmutableList.> of(xml)); + + return Futures.transform(biRpcRegistry.invokeRpc(rpc, wrappedXml), new Function, RpcResult>() { + @Override + public RpcResult apply(RpcResult input) { + Object baResultValue = null; + if (input.getResult() != null) { + baResultValue = mappingService.dataObjectFromDataDom(outputClass.get(), input.getResult()); + } + return Rpcs.getRpcResult(input.isSuccessful(), baResultValue, input.getErrors()); + } + }); + } } private class NoInputNoOutputInvocationStrategy extends RpcInvocationStrategy { @@ -876,18 +883,21 @@ public class BindingIndependentConnector implements // } @Override - public Future> forwardToDomBroker(final DataObject input) { - if(biRpcRegistry != null) { - CompositeNode xml = mappingService.toDataDom(input); - CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc,ImmutableList.>of(xml)); - RpcResult result = biRpcRegistry.invokeRpc(rpc, wrappedXml); - Object baResultValue = null; - RpcResult baResult = Rpcs.getRpcResult(result.isSuccessful(), null, result.getErrors()); - return Futures.>immediateFuture(baResult); + public ListenableFuture> forwardToDomBroker(final DataObject input) { + if(biRpcRegistry == null) { + return Futures.> immediateFuture(Rpcs.getRpcResult(false)); } - return Futures.>immediateFuture(Rpcs.getRpcResult(false)); - } + CompositeNode xml = mappingService.toDataDom(input); + CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc,ImmutableList.>of(xml)); + + return Futures.transform(biRpcRegistry.invokeRpc(rpc, wrappedXml), new Function, RpcResult>() { + @Override + public RpcResult apply(RpcResult input) { + return Rpcs.getRpcResult(input.isSuccessful(), null, input.getErrors()); + } + }); + } } public boolean isRpcForwarding() { diff --git a/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/util/BindingTestContext.java b/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/util/BindingTestContext.java index 4bad2bbb86..d6d87bac84 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/util/BindingTestContext.java +++ b/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/util/BindingTestContext.java @@ -351,7 +351,6 @@ public class BindingTestContext implements AutoCloseable, SchemaContextProvider private void startDomBroker() { checkState(executor != null); biBrokerImpl = new BrokerImpl(); - biBrokerImpl.setExecutor(executor); biBrokerImpl.setRouter(new SchemaAwareRpcBroker("/", this)); } diff --git a/opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/connect/dom/CrossBrokerRpcTest.java b/opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/connect/dom/CrossBrokerRpcTest.java index b1547b66a7..ca38ed0797 100644 --- a/opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/connect/dom/CrossBrokerRpcTest.java +++ b/opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/connect/dom/CrossBrokerRpcTest.java @@ -50,6 +50,7 @@ import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; public class CrossBrokerRpcTest { @@ -64,7 +65,7 @@ public class CrossBrokerRpcTest { public static final NodeId NODE_B = new NodeId("b"); public static final NodeId NODE_C = new NodeId("c"); public static final NodeId NODE_D = new NodeId("d"); - + private static final QName NODE_ID_QNAME = QName.create(Node.QNAME, "id"); private static final QName ADD_FLOW_QNAME = QName.create(NodeFlowRemoved.QNAME, "add-flow"); @@ -78,7 +79,7 @@ public class CrossBrokerRpcTest { public static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier BI_NODE_C_ID = createBINodeIdentifier(NODE_C); public static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier BI_NODE_D_ID = createBINodeIdentifier(NODE_D); - + @Before public void setup() { @@ -99,7 +100,7 @@ public class CrossBrokerRpcTest { } @Test - public void bindingRoutedRpcProvider_DomInvokerTest() { + public void bindingRoutedRpcProvider_DomInvokerTest() throws Exception { flowService// .registerPath(NodeContext.class, BA_NODE_A_ID) // @@ -114,7 +115,7 @@ public class CrossBrokerRpcTest { CompositeNode addFlowDom = toDomRpc(ADD_FLOW_QNAME, addFlowA); assertNotNull(addFlowDom); - RpcResult domResult = biRpcInvoker.invokeRpc(ADD_FLOW_QNAME, addFlowDom); + RpcResult domResult = biRpcInvoker.invokeRpc(ADD_FLOW_QNAME, addFlowDom).get(); assertNotNull(domResult); assertTrue("DOM result is successful.", domResult.isSuccessful()); assertTrue("Bidning Add Flow RPC was captured.", flowService.getReceivedAddFlows().containsKey(BA_NODE_A_ID)); @@ -128,18 +129,18 @@ public class CrossBrokerRpcTest { final AddFlowOutput output = builder.build(); org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration registration = biRpcRegistry.addRoutedRpcImplementation(ADD_FLOW_QNAME, new RpcImplementation() { @Override - public RpcResult invokeRpc(QName rpc, CompositeNode input) { - CompositeNode result = testContext.getBindingToDomMappingService().toDataDom(output); - return Rpcs.getRpcResult(true, result, ImmutableList.of()); + public Set getSupportedRpcs() { + return ImmutableSet.of(ADD_FLOW_QNAME); } @Override - public Set getSupportedRpcs() { - return ImmutableSet.of(ADD_FLOW_QNAME); + public ListenableFuture> invokeRpc(QName rpc, CompositeNode input) { + CompositeNode result = testContext.getBindingToDomMappingService().toDataDom(output); + return Futures.immediateFuture(Rpcs.getRpcResult(true, result, ImmutableList.of())); } }); registration.registerPath(NodeContext.QNAME, BI_NODE_C_ID); - + SalFlowService baFlowInvoker = baRpcRegistry.getRpcService(SalFlowService.class); Future> baResult = baFlowInvoker.addFlow(addFlow(BA_NODE_C_ID).setPriority(500).build()); assertNotNull(baResult); diff --git a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RoutedRpcDefaultImplementation.java b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RoutedRpcDefaultImplementation.java index c8eb7fd56f..4f11ba0661 100644 --- a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RoutedRpcDefaultImplementation.java +++ b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RoutedRpcDefaultImplementation.java @@ -1,3 +1,10 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ package org.opendaylight.controller.sal.core.api; import org.opendaylight.yangtools.yang.common.QName; @@ -5,8 +12,10 @@ import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.CompositeNode; import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; +import com.google.common.util.concurrent.ListenableFuture; + public interface RoutedRpcDefaultImplementation { - public RpcResult invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input); + ListenableFuture> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input); } diff --git a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RpcImplementation.java b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RpcImplementation.java index 6b1030a815..38b33d5d2a 100644 --- a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RpcImplementation.java +++ b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RpcImplementation.java @@ -15,6 +15,8 @@ import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import com.google.common.util.concurrent.ListenableFuture; + /** * {@link Provider}'s implementation of an RPC. * @@ -42,8 +44,6 @@ import org.opendaylight.yangtools.yang.data.api.CompositeNode; * {@link RpcResult} *
  • {@link Broker} returns the {@link RpcResult} to {@link Consumer} * - * - * */ public interface RpcImplementation extends Provider.ProviderFunctionality { @@ -59,13 +59,12 @@ public interface RpcImplementation extends Provider.ProviderFunctionality { Set getSupportedRpcs(); /** - * Invokes a implementation of specified rpc. - * + * Invokes a implementation of specified RPC asynchronously. * * @param rpc - * Rpc to be invoked + * RPC to be invoked * @param input - * Input data for rpc. + * Input data for the RPC. * * @throws IllegalArgumentException *
      @@ -73,9 +72,9 @@ public interface RpcImplementation extends Provider.ProviderFunctionality { *
    • If input is not null and * false == rpc.equals(input.getNodeType) *
    - * @return RpcResult containing the output of rpc if was executed - * successfully, the list of errors otherwise. + * @return Future promising an RpcResult containing the output of + * the RPC if was executed successfully, the list of errors + * otherwise. */ - RpcResult invokeRpc(QName rpc, CompositeNode input); - + ListenableFuture> invokeRpc(QName rpc, CompositeNode input); } diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/BrokerImpl.xtend b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/BrokerImpl.xtend index 64de8683d1..0ed14c1027 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/BrokerImpl.xtend +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/BrokerImpl.xtend @@ -7,13 +7,11 @@ */ package org.opendaylight.controller.sal.dom.broker; +import com.google.common.util.concurrent.ListenableFuture import java.util.Collections import java.util.HashSet import java.util.Set -import java.util.concurrent.Callable -import java.util.concurrent.ExecutorService -import java.util.concurrent.Executors -import java.util.concurrent.Future +import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener import org.opendaylight.controller.sal.core.api.Broker import org.opendaylight.controller.sal.core.api.Consumer import org.opendaylight.controller.sal.core.api.Provider @@ -26,7 +24,6 @@ import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter import org.opendaylight.controller.sal.core.api.RpcRegistrationListener import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry import org.opendaylight.controller.sal.core.api.RpcImplementation -import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener import org.opendaylight.controller.sal.core.api.RpcRoutingContext import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation @@ -39,12 +36,9 @@ public class BrokerImpl implements Broker, RpcProvisionRegistry, AutoCloseable { private val Set providerSessions = Collections.synchronizedSet( new HashSet()); - // Implementation specific - @Property - private var ExecutorService executor = Executors.newFixedThreadPool(5); @Property private var BundleContext bundleContext; - + @Property private var AutoCloseable deactivator; @@ -69,9 +63,8 @@ public class BrokerImpl implements Broker, RpcProvisionRegistry, AutoCloseable { return session; } - protected def Future> invokeRpcAsync(QName rpc, CompositeNode input) { - val result = executor.submit([|router.invokeRpc(rpc, input)] as Callable>); - return result; + protected def ListenableFuture> invokeRpcAsync(QName rpc, CompositeNode input) { + return router.invokeRpc(rpc, input); } // Validation @@ -111,15 +104,15 @@ public class BrokerImpl implements Broker, RpcProvisionRegistry, AutoCloseable { sessions.remove(consumerContextImpl); providerSessions.remove(consumerContextImpl); } - + override close() throws Exception { deactivator?.close(); } - + override addRpcImplementation(QName rpcType, RpcImplementation implementation) throws IllegalArgumentException { router.addRpcImplementation(rpcType,implementation); } - + override addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) { router.addRoutedRpcImplementation(rpcType,implementation); } @@ -131,17 +124,17 @@ public class BrokerImpl implements Broker, RpcProvisionRegistry, AutoCloseable { override addRpcRegistrationListener(RpcRegistrationListener listener) { return router.addRpcRegistrationListener(listener); } - + override > registerRouteChangeListener(L listener) { return router.registerRouteChangeListener(listener); } - override invokeRpc(QName rpc,CompositeNode input){ - return router.invokeRpc(rpc,input) - } - override getSupportedRpcs() { return router.getSupportedRpcs(); } - + + override invokeRpc(QName rpc, CompositeNode input) { + return router.invokeRpc(rpc,input) + } + } diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/MountPointImpl.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/MountPointImpl.java index f9f977e3c2..263f0500fd 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/MountPointImpl.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/MountPointImpl.java @@ -123,9 +123,8 @@ public class MountPointImpl implements MountProvisionInstance, SchemaContextProv return rpcs.getSupportedRpcs(); } - @Override - public RpcResult invokeRpc(QName rpc, CompositeNode input) { + public ListenableFuture> invokeRpc(QName rpc, CompositeNode input) { return rpcs.invokeRpc(rpc, input); } @@ -134,7 +133,6 @@ public class MountPointImpl implements MountProvisionInstance, SchemaContextProv return rpcs.addRpcRegistrationListener(listener); } - @Override public ListenableFuture> rpc(QName type, CompositeNode input) { return null; @@ -228,6 +226,4 @@ public class MountPointImpl implements MountProvisionInstance, SchemaContextProv L listener) { return rpcs.registerRouteChangeListener(listener); } - - } diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/SchemaAwareRpcBroker.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/SchemaAwareRpcBroker.java index 598361c3ae..3e7b115f11 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/SchemaAwareRpcBroker.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/SchemaAwareRpcBroker.java @@ -46,6 +46,7 @@ import com.google.common.base.Optional; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.ListenableFuture; public class SchemaAwareRpcBroker implements RpcRouter, Identifiable, RoutedRpcDefaultImplementation { @@ -85,16 +86,16 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable, Ro this.schemaProvider = schemaProvider; } - public RoutedRpcDefaultImplementation getRoutedRpcDefaultDelegate() { - return defaultDelegate; - } + public RoutedRpcDefaultImplementation getRoutedRpcDefaultDelegate() { + return defaultDelegate; + } @Override - public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultDelegate) { - this.defaultDelegate = defaultDelegate; - } + public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultDelegate) { + this.defaultDelegate = defaultDelegate; + } - @Override + @Override public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) { checkArgument(rpcType != null, "RPC Type should not be null"); checkArgument(implementation != null, "RPC Implementatoin should not be null"); @@ -163,7 +164,7 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable, Ro } @Override - public RpcResult invokeRpc(QName rpc, CompositeNode input) { + public ListenableFuture> invokeRpc(QName rpc, CompositeNode input) { return findRpcImplemention(rpc).invokeRpc(rpc, input); } @@ -235,7 +236,7 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable, Ro } @Override - public RpcResult invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) { + public ListenableFuture> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) { checkState(defaultDelegate != null); return defaultDelegate.invokeRpc(rpc, identifier, input); } @@ -319,7 +320,7 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable, Ro } @Override - public RpcResult invokeRpc(QName rpc, CompositeNode input) { + public ListenableFuture> invokeRpc(QName rpc, CompositeNode input) { CompositeNode inputContainer = input.getFirstCompositeByName(QName.create(rpc,"input")); checkArgument(inputContainer != null, "Rpc payload must contain input element"); SimpleNode routeContainer = inputContainer.getFirstSimpleByName(strategy.getLeaf()); diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/osgi/RpcProvisionRegistryProxy.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/osgi/RpcProvisionRegistryProxy.java index 40842c004a..6e44cba494 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/osgi/RpcProvisionRegistryProxy.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/osgi/RpcProvisionRegistryProxy.java @@ -8,6 +8,8 @@ package org.opendaylight.controller.sal.dom.broker.osgi; +import java.util.Set; + import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener; import org.opendaylight.controller.sal.core.api.*; import org.opendaylight.yangtools.concepts.ListenerRegistration; @@ -17,7 +19,7 @@ import org.opendaylight.yangtools.yang.data.api.CompositeNode; import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; import org.osgi.framework.ServiceReference; -import java.util.Set; +import com.google.common.util.concurrent.ListenableFuture; public class RpcProvisionRegistryProxy extends AbstractBrokerServiceProxy implements RpcProvisionRegistry { @@ -41,24 +43,23 @@ public class RpcProvisionRegistryProxy extends AbstractBrokerServiceProxy> ListenerRegistration registerRouteChangeListener(L listener) { return getDelegate().registerRouteChangeListener(listener); } + @Override + public Set getSupportedRpcs() { + return getDelegate().getSupportedRpcs(); + } - @Override - public Set getSupportedRpcs() { - return getDelegate().getSupportedRpcs(); - } - - @Override - public RpcResult invokeRpc(QName rpc, CompositeNode input) { - return getDelegate().invokeRpc(rpc,input); - } + @Override + public ListenableFuture> invokeRpc(QName rpc, CompositeNode input) { + return getDelegate().invokeRpc(rpc, input); + } } diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/spi/RoutedRpcProcessor.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/spi/RoutedRpcProcessor.java index 02419ff529..2976c76ffa 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/spi/RoutedRpcProcessor.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/spi/RoutedRpcProcessor.java @@ -8,27 +8,20 @@ package org.opendaylight.controller.sal.dom.broker.spi; import java.util.Map; -import java.util.Set; import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration; import org.opendaylight.controller.sal.core.api.RpcImplementation; import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.common.RpcResult; -import org.opendaylight.yangtools.yang.data.api.CompositeNode; import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; public interface RoutedRpcProcessor extends RpcImplementation { - public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation); + RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation); - public Set getSupportedRpcs(); - - public QName getRpcType(); - - public RpcResult invokeRpc(QName rpc, CompositeNode input); + QName getRpcType(); Map getRoutes(); - + RpcImplementation getDefaultRoute(); } diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/spi/RpcRouter.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/spi/RpcRouter.java index d1523a01d6..b19dac5535 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/spi/RpcRouter.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/spi/RpcRouter.java @@ -14,8 +14,6 @@ import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration; import org.opendaylight.controller.sal.core.api.RpcImplementation; import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry; import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.common.RpcResult; -import org.opendaylight.yangtools.yang.data.api.CompositeNode; public interface RpcRouter extends RpcProvisionRegistry, RpcImplementation { @@ -28,7 +26,4 @@ public interface RpcRouter extends RpcProvisionRegistry, RpcImplementation { @Override public Set getSupportedRpcs(); - - @Override - public RpcResult invokeRpc(QName rpc, CompositeNode input); } diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend index 4ae84c7d31..3eb0472b5c 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend @@ -55,6 +55,7 @@ import static com.google.common.base.Preconditions.* import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.* import static extension org.opendaylight.controller.sal.connect.netconf.NetconfMapping.* +import com.google.common.util.concurrent.Futures class NetconfDevice implements Provider, // DataReader, // @@ -203,13 +204,13 @@ AutoCloseable { override readConfigurationData(InstanceIdentifier path) { val result = invokeRpc(NETCONF_GET_CONFIG_QNAME, - wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure())); + wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure())).get(); val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME); return data?.findNode(path) as CompositeNode; } override readOperationalData(InstanceIdentifier path) { - val result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, path.toFilterStructure())); + val result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, path.toFilterStructure())).get(); val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME); return data?.findNode(path) as CompositeNode; } @@ -218,19 +219,18 @@ AutoCloseable { Collections.emptySet; } - def createSubscription(String streamName) { - val it = ImmutableCompositeNode.builder() - QName = NETCONF_CREATE_SUBSCRIPTION_QNAME - addLeaf("stream", streamName); - invokeRpc(QName, toInstance()) - } +// def createSubscription(String streamName) { +// val it = ImmutableCompositeNode.builder() +// QName = NETCONF_CREATE_SUBSCRIPTION_QNAME +// addLeaf("stream", streamName); +// invokeRpc(QName, toInstance()) +// } override invokeRpc(QName rpc, CompositeNode input) { try { val message = rpc.toRpcMessage(input,schemaContext); val result = sendMessageImpl(message, messegeRetryCount, messageTimeoutCount); - return result.toRpcResult(rpc, schemaContext); - + return Futures.immediateFuture(result.toRpcResult(rpc, schemaContext)); } catch (Exception e) { logger.error("Rpc was not processed correctly.", e) throw e; @@ -342,7 +342,6 @@ AutoCloseable { operReaderReg?.close() client?.close() } - } package class NetconfDeviceSchemaContextProvider { diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceTwoPhaseCommitTransaction.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceTwoPhaseCommitTransaction.java index c5390e5409..9ec3aa3bb0 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceTwoPhaseCommitTransaction.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceTwoPhaseCommitTransaction.java @@ -15,14 +15,17 @@ import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NET import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_RUNNING_QNAME; import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_TARGET_QNAME; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction; import org.opendaylight.controller.md.sal.common.api.data.DataModification; import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.CompositeNode; import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; @@ -31,6 +34,8 @@ import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.Node; import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode; import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -38,7 +43,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; public class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransaction { - + private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceTwoPhaseCommitTransaction.class); private final NetconfDevice device; private final DataModification modification; private final boolean candidateSupported = true; @@ -50,7 +55,7 @@ public class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransac this.modification = modification; } - public void prepare() { + void prepare() throws InterruptedException, ExecutionException { for (InstanceIdentifier toRemove : modification.getRemovedConfigurationData()) { sendDelete(toRemove); } @@ -60,20 +65,20 @@ public class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransac } - private void sendMerge(InstanceIdentifier key, CompositeNode value) { + private void sendMerge(InstanceIdentifier key, CompositeNode value) throws InterruptedException, ExecutionException { sendEditRpc(createEditStructure(key, Optional.absent(), Optional.of(value))); } - private void sendDelete(InstanceIdentifier toDelete) { + private void sendDelete(InstanceIdentifier toDelete) throws InterruptedException, ExecutionException { sendEditRpc(createEditStructure(toDelete, Optional.of("delete"), Optional. absent())); } - private void sendEditRpc(CompositeNode editStructure) { + private void sendEditRpc(CompositeNode editStructure) throws InterruptedException, ExecutionException { CompositeNodeBuilder builder = configurationRpcBuilder(); builder.setQName(NETCONF_EDIT_CONFIG_QNAME); builder.add(editStructure); - RpcResult rpcResult = device.invokeRpc(NETCONF_EDIT_CONFIG_QNAME, builder.toInstance()); + RpcResult rpcResult = device.invokeRpc(NETCONF_EDIT_CONFIG_QNAME, builder.toInstance()).get(); Preconditions.checkState(rpcResult.isSuccessful(),"Rpc Result was unsuccessful"); } @@ -135,8 +140,45 @@ public class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransac public RpcResult finish() { CompositeNodeBuilder commitInput = ImmutableCompositeNode.builder(); commitInput.setQName(NETCONF_COMMIT_QNAME); - RpcResult rpcResult = device.invokeRpc(NetconfMapping.NETCONF_COMMIT_QNAME, commitInput.toInstance()); - return (RpcResult) rpcResult; + try { + final RpcResult rpcResult = device.invokeRpc(NetconfMapping.NETCONF_COMMIT_QNAME, commitInput.toInstance()).get(); + return new RpcResult() { + + @Override + public boolean isSuccessful() { + return rpcResult.isSuccessful(); + } + + @Override + public Void getResult() { + return null; + } + + @Override + public Collection getErrors() { + return rpcResult.getErrors(); + } + }; + } catch (final InterruptedException | ExecutionException e) { + LOG.warn("Failed to finish operation", e); + return new RpcResult() { + @Override + public boolean isSuccessful() { + return false; + } + + @Override + public Void getResult() { + return null; + } + + @Override + public Collection getErrors() { + // FIXME: wrap the exception + return Collections.emptySet(); + } + }; + } } @Override diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfRemoteSchemaSourceProvider.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfRemoteSchemaSourceProvider.java index fa6b6f7ca5..1932726600 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfRemoteSchemaSourceProvider.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfRemoteSchemaSourceProvider.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.sal.connect.netconf; import java.util.Set; +import java.util.concurrent.ExecutionException; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcResult; @@ -44,15 +45,19 @@ class NetconfRemoteSchemaSourceProvider implements SchemaSourceProvider } device.logger.trace("Loading YANG schema source for {}:{}", moduleName, revision); - RpcResult schemaReply = device.invokeRpc(GET_SCHEMA_QNAME, request.toInstance()); - if (schemaReply.isSuccessful()) { - String schemaBody = getSchemaFromRpc(schemaReply.getResult()); - if (schemaBody != null) { - device.logger.trace("YANG Schema successfully retrieved from remote for {}:{}", moduleName, revision); - return Optional.of(schemaBody); + try { + RpcResult schemaReply = device.invokeRpc(GET_SCHEMA_QNAME, request.toInstance()).get(); + if (schemaReply.isSuccessful()) { + String schemaBody = getSchemaFromRpc(schemaReply.getResult()); + if (schemaBody != null) { + device.logger.trace("YANG Schema successfully retrieved from remote for {}:{}", moduleName, revision); + return Optional.of(schemaBody); + } } + device.logger.warn("YANG shcema was not successfully retrieved."); + } catch (InterruptedException | ExecutionException e) { + device.logger.warn("YANG shcema was not successfully retrieved.", e); } - device.logger.warn("YANG shcema was not successfully retrieved."); return Optional.absent(); } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImpl.java index 84df2e43f0..8f95e73b15 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImpl.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImpl.java @@ -7,7 +7,16 @@ package org.opendaylight.controller.sal.connector.remoterpc; -import com.google.common.base.Optional; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; + import org.opendaylight.controller.sal.common.util.RpcErrors; import org.opendaylight.controller.sal.common.util.Rpcs; import org.opendaylight.controller.sal.connector.api.RpcRouter; @@ -27,16 +36,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.zeromq.ZMQ; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.FutureTask; -import java.util.concurrent.TimeUnit; - -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; /** * An implementation of {@link RpcImplementation} that makes @@ -46,8 +48,8 @@ public class ClientImpl implements RemoteRpcClient { private final Logger _logger = LoggerFactory.getLogger(ClientImpl.class); - private ZMQ.Context context = ZMQ.context(1); - private ClientRequestHandler handler; + private final ZMQ.Context context = ZMQ.context(1); + private final ClientRequestHandler handler; private RoutingTableProvider routingTableProvider; public ClientImpl(){ @@ -64,6 +66,7 @@ public class ClientImpl implements RemoteRpcClient { return routingTableProvider; } + @Override public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) { this.routingTableProvider = routingTableProvider; } @@ -93,7 +96,7 @@ public class ClientImpl implements RemoteRpcClient { * @param input payload for the remote service * @return */ - public RpcResult invokeRpc(QName rpc, CompositeNode input) { + public ListenableFuture> invokeRpc(QName rpc, CompositeNode input) { RouteIdentifierImpl routeId = new RouteIdentifierImpl(); routeId.setType(rpc); @@ -115,7 +118,7 @@ public class ClientImpl implements RemoteRpcClient { * payload * @return */ - public RpcResult invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) { + public ListenableFuture> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) { RouteIdentifierImpl routeId = new RouteIdentifierImpl(); routeId.setType(rpc); @@ -126,7 +129,7 @@ public class ClientImpl implements RemoteRpcClient { return sendMessage(input, routeId, address); } - private RpcResult sendMessage(CompositeNode input, RouteIdentifierImpl routeId, String address) { + private ListenableFuture> sendMessage(CompositeNode input, RouteIdentifierImpl routeId, String address) { Message request = new Message.MessageBuilder() .type(Message.MessageType.REQUEST) .sender(Context.getInstance().getLocalUri()) @@ -164,11 +167,11 @@ public class ClientImpl implements RemoteRpcClient { } } - return Rpcs.getRpcResult(true, payload, errors); + return Futures.immediateFuture(Rpcs.getRpcResult(true, payload, errors)); } catch (Exception e){ collectErrors(e, errors); - return Rpcs.getRpcResult(false, null, errors); + return Futures.immediateFuture(Rpcs.getRpcResult(false, null, errors)); } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcProvider.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcProvider.java index 16e7200247..edcef83574 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcProvider.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcProvider.java @@ -8,7 +8,14 @@ package org.opendaylight.controller.sal.connector.remoterpc; -import com.google.common.base.Optional; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + import org.opendaylight.controller.md.sal.common.api.routing.RouteChange; import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener; import org.opendaylight.controller.sal.connector.api.RpcRouter; @@ -32,13 +39,8 @@ import org.osgi.util.tracker.ServiceTracker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import static com.google.common.base.Preconditions.checkNotNull; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.ListenableFuture; public class RemoteRpcProvider implements RpcImplementation, @@ -46,7 +48,7 @@ public class RemoteRpcProvider implements AutoCloseable, Provider { - private Logger _logger = LoggerFactory.getLogger(RemoteRpcProvider.class); + private final Logger _logger = LoggerFactory.getLogger(RemoteRpcProvider.class); private final ServerImpl server; private final ClientImpl client; @@ -96,12 +98,12 @@ public class RemoteRpcProvider implements } @Override - public RpcResult invokeRpc(QName rpc, CompositeNode input) { + public ListenableFuture> invokeRpc(QName rpc, CompositeNode input) { return client.invokeRpc(rpc, input); } @Override - public RpcResult invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) { + public ListenableFuture> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) { return client.invokeRpc(rpc, identifier, input); } @@ -289,8 +291,5 @@ public class RemoteRpcProvider implements return routeIdSet; } - - } - } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImplTest.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImplTest.java index c0aae2dfb5..0fa12e351c 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImplTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImplTest.java @@ -7,29 +7,25 @@ */ package org.opendaylight.controller.sal.connector.remoterpc; -import com.google.common.base.Optional; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; + import junit.framework.Assert; + import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.opendaylight.controller.sal.common.util.Rpcs; import org.opendaylight.controller.sal.connector.api.RpcRouter; -import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener; import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable; -import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException; -import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException; import org.opendaylight.controller.sal.connector.remoterpc.dto.Message; import org.opendaylight.controller.sal.connector.remoterpc.utils.MessagingUtil; -import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.CompositeNode; -import java.io.IOException; -import java.util.*; - -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import com.google.common.base.Optional; /** * @@ -88,7 +84,7 @@ public class ClientImplTest { when(mockHandler.handle(any(Message.class))). thenReturn(MessagingUtil.createEmptyMessage()); - RpcResult result = client.invokeRpc(null, null); + RpcResult result = client.invokeRpc(null, null).get(); Assert.assertTrue(result.isSuccessful()); Assert.assertTrue(result.getErrors().isEmpty()); @@ -101,7 +97,7 @@ public class ClientImplTest { when(mockHandler.handle(any(Message.class))). thenThrow(new IOException()); - RpcResult result = client.invokeRpc(null, null); + RpcResult result = client.invokeRpc(null, null).get(); Assert.assertFalse(result.isSuccessful()); Assert.assertFalse(result.getErrors().isEmpty());