import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSet.Builder;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
public class BindingIndependentConnector implements //
RuntimeDataProvider, //
}
}
+
@Override
- public RpcResult<CompositeNode> invokeRpc(final QName rpc, final CompositeNode domInput) {
+ public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(final QName rpc, final CompositeNode domInput) {
checkArgument(rpc != null);
checkArgument(domInput != null);
RpcService rpcService = baRpcRegistry.getRpcService(rpcType);
checkState(rpcService != null);
CompositeNode domUnwrappedInput = domInput.getFirstCompositeByName(QName.create(rpc, "input"));
+
try {
- return resolveInvocationStrategy(rpc).invokeOn(rpcService, domUnwrappedInput);
+ return Futures.immediateFuture(resolveInvocationStrategy(rpc).invokeOn(rpcService, domUnwrappedInput));
} catch (Exception e) {
- throw new IllegalStateException(e);
+ return Futures.immediateFailedFuture(e);
}
}
}
@Override
- public Future<RpcResult<?>> forwardToDomBroker(final DataObject input) {
- if(biRpcRegistry != null) {
- CompositeNode xml = mappingService.toDataDom(input);
- CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc, ImmutableList.<Node<?>> of(xml));
- RpcResult<CompositeNode> result = biRpcRegistry.invokeRpc(rpc, wrappedXml);
- Object baResultValue = null;
- if (result.getResult() != null) {
- baResultValue = mappingService.dataObjectFromDataDom(outputClass.get(), result.getResult());
- }
- RpcResult<?> baResult = Rpcs.getRpcResult(result.isSuccessful(), baResultValue, result.getErrors());
- return Futures.<RpcResult<?>> immediateFuture(baResult);
+ public ListenableFuture<RpcResult<?>> forwardToDomBroker(final DataObject input) {
+ if(biRpcRegistry == null) {
+ return Futures.<RpcResult<?>> immediateFuture(Rpcs.getRpcResult(false));
}
- return Futures.<RpcResult<?>> immediateFuture(Rpcs.getRpcResult(false));
- }
+ CompositeNode xml = mappingService.toDataDom(input);
+ CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc, ImmutableList.<Node<?>> of(xml));
+
+ return Futures.transform(biRpcRegistry.invokeRpc(rpc, wrappedXml), new Function<RpcResult<CompositeNode>, RpcResult<?>>() {
+ @Override
+ public RpcResult<?> apply(RpcResult<CompositeNode> input) {
+ Object baResultValue = null;
+ if (input.getResult() != null) {
+ baResultValue = mappingService.dataObjectFromDataDom(outputClass.get(), input.getResult());
+ }
+ return Rpcs.getRpcResult(input.isSuccessful(), baResultValue, input.getErrors());
+ }
+ });
+ }
}
private class NoInputNoOutputInvocationStrategy extends RpcInvocationStrategy {
}
@Override
- public Future<RpcResult<?>> forwardToDomBroker(final DataObject input) {
- if(biRpcRegistry != null) {
- CompositeNode xml = mappingService.toDataDom(input);
- CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc,ImmutableList.<Node<?>>of(xml));
- RpcResult<CompositeNode> result = biRpcRegistry.invokeRpc(rpc, wrappedXml);
- Object baResultValue = null;
- RpcResult<?> baResult = Rpcs.<Void>getRpcResult(result.isSuccessful(), null, result.getErrors());
- return Futures.<RpcResult<?>>immediateFuture(baResult);
+ public ListenableFuture<RpcResult<?>> forwardToDomBroker(final DataObject input) {
+ if(biRpcRegistry == null) {
+ return Futures.<RpcResult<?>> immediateFuture(Rpcs.getRpcResult(false));
}
- return Futures.<RpcResult<?>>immediateFuture(Rpcs.getRpcResult(false));
- }
+ CompositeNode xml = mappingService.toDataDom(input);
+ CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc,ImmutableList.<Node<?>>of(xml));
+
+ return Futures.transform(biRpcRegistry.invokeRpc(rpc, wrappedXml), new Function<RpcResult<CompositeNode>, RpcResult<?>>() {
+ @Override
+ public RpcResult<?> apply(RpcResult<CompositeNode> input) {
+ return Rpcs.<Void>getRpcResult(input.isSuccessful(), null, input.getErrors());
+ }
+ });
+ }
}
public boolean isRpcForwarding() {
private void startDomBroker() {
checkState(executor != null);
biBrokerImpl = new BrokerImpl();
- biBrokerImpl.setExecutor(executor);
biBrokerImpl.setRouter(new SchemaAwareRpcBroker("/", this));
}
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
public class CrossBrokerRpcTest {
public static final NodeId NODE_B = new NodeId("b");
public static final NodeId NODE_C = new NodeId("c");
public static final NodeId NODE_D = new NodeId("d");
-
+
private static final QName NODE_ID_QNAME = QName.create(Node.QNAME, "id");
private static final QName ADD_FLOW_QNAME = QName.create(NodeFlowRemoved.QNAME, "add-flow");
public static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier BI_NODE_C_ID = createBINodeIdentifier(NODE_C);
public static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier BI_NODE_D_ID = createBINodeIdentifier(NODE_D);
-
+
@Before
public void setup() {
}
@Test
- public void bindingRoutedRpcProvider_DomInvokerTest() {
+ public void bindingRoutedRpcProvider_DomInvokerTest() throws Exception {
flowService//
.registerPath(NodeContext.class, BA_NODE_A_ID) //
CompositeNode addFlowDom = toDomRpc(ADD_FLOW_QNAME, addFlowA);
assertNotNull(addFlowDom);
- RpcResult<CompositeNode> domResult = biRpcInvoker.invokeRpc(ADD_FLOW_QNAME, addFlowDom);
+ RpcResult<CompositeNode> domResult = biRpcInvoker.invokeRpc(ADD_FLOW_QNAME, addFlowDom).get();
assertNotNull(domResult);
assertTrue("DOM result is successful.", domResult.isSuccessful());
assertTrue("Bidning Add Flow RPC was captured.", flowService.getReceivedAddFlows().containsKey(BA_NODE_A_ID));
final AddFlowOutput output = builder.build();
org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration registration = biRpcRegistry.addRoutedRpcImplementation(ADD_FLOW_QNAME, new RpcImplementation() {
@Override
- public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
- CompositeNode result = testContext.getBindingToDomMappingService().toDataDom(output);
- return Rpcs.getRpcResult(true, result, ImmutableList.<RpcError>of());
+ public Set<QName> getSupportedRpcs() {
+ return ImmutableSet.of(ADD_FLOW_QNAME);
}
@Override
- public Set<QName> getSupportedRpcs() {
- return ImmutableSet.of(ADD_FLOW_QNAME);
+ public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
+ CompositeNode result = testContext.getBindingToDomMappingService().toDataDom(output);
+ return Futures.immediateFuture(Rpcs.getRpcResult(true, result, ImmutableList.<RpcError>of()));
}
});
registration.registerPath(NodeContext.QNAME, BI_NODE_C_ID);
-
+
SalFlowService baFlowInvoker = baRpcRegistry.getRpcService(SalFlowService.class);
Future<RpcResult<AddFlowOutput>> baResult = baFlowInvoker.addFlow(addFlow(BA_NODE_C_ID).setPriority(500).build());
assertNotNull(baResult);
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
package org.opendaylight.controller.sal.core.api;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import com.google.common.util.concurrent.ListenableFuture;
+
public interface RoutedRpcDefaultImplementation {
- public RpcResult<CompositeNode> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input);
+ ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input);
}
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import com.google.common.util.concurrent.ListenableFuture;
+
/**
* {@link Provider}'s implementation of an RPC.
*
* {@link RpcResult}
* <li> {@link Broker} returns the {@link RpcResult} to {@link Consumer}
* </ol>
- *
- *
*/
public interface RpcImplementation extends Provider.ProviderFunctionality {
Set<QName> getSupportedRpcs();
/**
- * Invokes a implementation of specified rpc.
- *
+ * Invokes a implementation of specified RPC asynchronously.
*
* @param rpc
- * Rpc to be invoked
+ * RPC to be invoked
* @param input
- * Input data for rpc.
+ * Input data for the RPC.
*
* @throws IllegalArgumentException
* <ul>
* <li>If input is not <code>null</code> and
* <code>false == rpc.equals(input.getNodeType)</code>
* </ul>
- * @return RpcResult containing the output of rpc if was executed
- * successfully, the list of errors otherwise.
+ * @return Future promising an RpcResult containing the output of
+ * the RPC if was executed successfully, the list of errors
+ * otherwise.
*/
- RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input);
-
+ ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input);
}
*/
package org.opendaylight.controller.sal.dom.broker;
+import com.google.common.util.concurrent.ListenableFuture
import java.util.Collections
import java.util.HashSet
import java.util.Set
-import java.util.concurrent.Callable
-import java.util.concurrent.ExecutorService
-import java.util.concurrent.Executors
-import java.util.concurrent.Future
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener
import org.opendaylight.controller.sal.core.api.Broker
import org.opendaylight.controller.sal.core.api.Consumer
import org.opendaylight.controller.sal.core.api.Provider
import org.opendaylight.controller.sal.core.api.RpcRegistrationListener
import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry
import org.opendaylight.controller.sal.core.api.RpcImplementation
-import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener
import org.opendaylight.controller.sal.core.api.RpcRoutingContext
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation
private val Set<ProviderContextImpl> providerSessions = Collections.synchronizedSet(
new HashSet<ProviderContextImpl>());
- // Implementation specific
- @Property
- private var ExecutorService executor = Executors.newFixedThreadPool(5);
@Property
private var BundleContext bundleContext;
-
+
@Property
private var AutoCloseable deactivator;
return session;
}
- protected def Future<RpcResult<CompositeNode>> invokeRpcAsync(QName rpc, CompositeNode input) {
- val result = executor.submit([|router.invokeRpc(rpc, input)] as Callable<RpcResult<CompositeNode>>);
- return result;
+ protected def ListenableFuture<RpcResult<CompositeNode>> invokeRpcAsync(QName rpc, CompositeNode input) {
+ return router.invokeRpc(rpc, input);
}
// Validation
sessions.remove(consumerContextImpl);
providerSessions.remove(consumerContextImpl);
}
-
+
override close() throws Exception {
deactivator?.close();
}
-
+
override addRpcImplementation(QName rpcType, RpcImplementation implementation) throws IllegalArgumentException {
router.addRpcImplementation(rpcType,implementation);
}
-
+
override addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
router.addRoutedRpcImplementation(rpcType,implementation);
}
override addRpcRegistrationListener(RpcRegistrationListener listener) {
return router.addRpcRegistrationListener(listener);
}
-
+
override <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> registerRouteChangeListener(L listener) {
return router.registerRouteChangeListener(listener);
}
- override invokeRpc(QName rpc,CompositeNode input){
- return router.invokeRpc(rpc,input)
- }
-
override getSupportedRpcs() {
return router.getSupportedRpcs();
}
-
+
+ override invokeRpc(QName rpc, CompositeNode input) {
+ return router.invokeRpc(rpc,input)
+ }
+
}
return rpcs.getSupportedRpcs();
}
-
@Override
- public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+ public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
return rpcs.invokeRpc(rpc, input);
}
return rpcs.addRpcRegistrationListener(listener);
}
-
@Override
public ListenableFuture<RpcResult<CompositeNode>> rpc(QName type, CompositeNode input) {
return null;
L listener) {
return rpcs.registerRouteChangeListener(listener);
}
-
-
}
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.ListenableFuture;
public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, RoutedRpcDefaultImplementation {
this.schemaProvider = schemaProvider;
}
- public RoutedRpcDefaultImplementation getRoutedRpcDefaultDelegate() {
- return defaultDelegate;
- }
+ public RoutedRpcDefaultImplementation getRoutedRpcDefaultDelegate() {
+ return defaultDelegate;
+ }
@Override
- public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultDelegate) {
- this.defaultDelegate = defaultDelegate;
- }
+ public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultDelegate) {
+ this.defaultDelegate = defaultDelegate;
+ }
- @Override
+ @Override
public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
checkArgument(rpcType != null, "RPC Type should not be null");
checkArgument(implementation != null, "RPC Implementatoin should not be null");
}
@Override
- public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+ public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
return findRpcImplemention(rpc).invokeRpc(rpc, input);
}
}
@Override
- public RpcResult<CompositeNode> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
+ public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
checkState(defaultDelegate != null);
return defaultDelegate.invokeRpc(rpc, identifier, input);
}
}
@Override
- public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+ public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
CompositeNode inputContainer = input.getFirstCompositeByName(QName.create(rpc,"input"));
checkArgument(inputContainer != null, "Rpc payload must contain input element");
SimpleNode<?> routeContainer = inputContainer.getFirstSimpleByName(strategy.getLeaf());
package org.opendaylight.controller.sal.dom.broker.osgi;
+import java.util.Set;
+
import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
import org.opendaylight.controller.sal.core.api.*;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.osgi.framework.ServiceReference;
-import java.util.Set;
+import com.google.common.util.concurrent.ListenableFuture;
public class RpcProvisionRegistryProxy extends AbstractBrokerServiceProxy<RpcProvisionRegistry>
implements RpcProvisionRegistry {
return getDelegate().addRoutedRpcImplementation(rpcType, implementation);
}
- @Override
- public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultImplementation) {
- getDelegate().setRoutedRpcDefaultDelegate(defaultImplementation);
- }
+ @Override
+ public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultImplementation) {
+ getDelegate().setRoutedRpcDefaultDelegate(defaultImplementation);
+ }
- @Override
+ @Override
public <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> ListenerRegistration<L> registerRouteChangeListener(L listener) {
return getDelegate().registerRouteChangeListener(listener);
}
+ @Override
+ public Set<QName> getSupportedRpcs() {
+ return getDelegate().getSupportedRpcs();
+ }
- @Override
- public Set<QName> getSupportedRpcs() {
- return getDelegate().getSupportedRpcs();
- }
-
- @Override
- public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
- return getDelegate().invokeRpc(rpc,input);
- }
+ @Override
+ public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
+ return getDelegate().invokeRpc(rpc, input);
+ }
}
package org.opendaylight.controller.sal.dom.broker.spi;
import java.util.Map;
-import java.util.Set;
import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
import org.opendaylight.controller.sal.core.api.RpcImplementation;
import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
public interface RoutedRpcProcessor extends RpcImplementation {
- public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation);
+ RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation);
- public Set<QName> getSupportedRpcs();
-
- public QName getRpcType();
-
- public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input);
+ QName getRpcType();
Map<InstanceIdentifier,RpcImplementation> getRoutes();
-
+
RpcImplementation getDefaultRoute();
}
import org.opendaylight.controller.sal.core.api.RpcImplementation;
import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
public interface RpcRouter extends RpcProvisionRegistry, RpcImplementation {
@Override
public Set<QName> getSupportedRpcs();
-
- @Override
- public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input);
}
import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.*
import static extension org.opendaylight.controller.sal.connect.netconf.NetconfMapping.*
+import com.google.common.util.concurrent.Futures
class NetconfDevice implements Provider, //
DataReader<InstanceIdentifier, CompositeNode>, //
override readConfigurationData(InstanceIdentifier path) {
val result = invokeRpc(NETCONF_GET_CONFIG_QNAME,
- wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure()));
+ wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure())).get();
val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
return data?.findNode(path) as CompositeNode;
}
override readOperationalData(InstanceIdentifier path) {
- val result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, path.toFilterStructure()));
+ val result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, path.toFilterStructure())).get();
val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
return data?.findNode(path) as CompositeNode;
}
Collections.emptySet;
}
- def createSubscription(String streamName) {
- val it = ImmutableCompositeNode.builder()
- QName = NETCONF_CREATE_SUBSCRIPTION_QNAME
- addLeaf("stream", streamName);
- invokeRpc(QName, toInstance())
- }
+// def createSubscription(String streamName) {
+// val it = ImmutableCompositeNode.builder()
+// QName = NETCONF_CREATE_SUBSCRIPTION_QNAME
+// addLeaf("stream", streamName);
+// invokeRpc(QName, toInstance())
+// }
override invokeRpc(QName rpc, CompositeNode input) {
try {
val message = rpc.toRpcMessage(input,schemaContext);
val result = sendMessageImpl(message, messegeRetryCount, messageTimeoutCount);
- return result.toRpcResult(rpc, schemaContext);
-
+ return Futures.immediateFuture(result.toRpcResult(rpc, schemaContext));
} catch (Exception e) {
logger.error("Rpc was not processed correctly.", e)
throw e;
operReaderReg?.close()
client?.close()
}
-
}
package class NetconfDeviceSchemaContextProvider {
import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_RUNNING_QNAME;
import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_TARGET_QNAME;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
import org.opendaylight.controller.md.sal.common.api.data.DataModification;
import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.Node;
import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
public class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransaction<InstanceIdentifier, CompositeNode> {
-
+ private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceTwoPhaseCommitTransaction.class);
private final NetconfDevice device;
private final DataModification<InstanceIdentifier, CompositeNode> modification;
private final boolean candidateSupported = true;
this.modification = modification;
}
- public void prepare() {
+ void prepare() throws InterruptedException, ExecutionException {
for (InstanceIdentifier toRemove : modification.getRemovedConfigurationData()) {
sendDelete(toRemove);
}
}
- private void sendMerge(InstanceIdentifier key, CompositeNode value) {
+ private void sendMerge(InstanceIdentifier key, CompositeNode value) throws InterruptedException, ExecutionException {
sendEditRpc(createEditStructure(key, Optional.<String>absent(), Optional.of(value)));
}
- private void sendDelete(InstanceIdentifier toDelete) {
+ private void sendDelete(InstanceIdentifier toDelete) throws InterruptedException, ExecutionException {
sendEditRpc(createEditStructure(toDelete, Optional.of("delete"), Optional.<CompositeNode> absent()));
}
- private void sendEditRpc(CompositeNode editStructure) {
+ private void sendEditRpc(CompositeNode editStructure) throws InterruptedException, ExecutionException {
CompositeNodeBuilder<ImmutableCompositeNode> builder = configurationRpcBuilder();
builder.setQName(NETCONF_EDIT_CONFIG_QNAME);
builder.add(editStructure);
- RpcResult<CompositeNode> rpcResult = device.invokeRpc(NETCONF_EDIT_CONFIG_QNAME, builder.toInstance());
+ RpcResult<CompositeNode> rpcResult = device.invokeRpc(NETCONF_EDIT_CONFIG_QNAME, builder.toInstance()).get();
Preconditions.checkState(rpcResult.isSuccessful(),"Rpc Result was unsuccessful");
}
public RpcResult<Void> finish() {
CompositeNodeBuilder<ImmutableCompositeNode> commitInput = ImmutableCompositeNode.builder();
commitInput.setQName(NETCONF_COMMIT_QNAME);
- RpcResult<?> rpcResult = device.invokeRpc(NetconfMapping.NETCONF_COMMIT_QNAME, commitInput.toInstance());
- return (RpcResult<Void>) rpcResult;
+ try {
+ final RpcResult<?> rpcResult = device.invokeRpc(NetconfMapping.NETCONF_COMMIT_QNAME, commitInput.toInstance()).get();
+ return new RpcResult<Void>() {
+
+ @Override
+ public boolean isSuccessful() {
+ return rpcResult.isSuccessful();
+ }
+
+ @Override
+ public Void getResult() {
+ return null;
+ }
+
+ @Override
+ public Collection<RpcError> getErrors() {
+ return rpcResult.getErrors();
+ }
+ };
+ } catch (final InterruptedException | ExecutionException e) {
+ LOG.warn("Failed to finish operation", e);
+ return new RpcResult<Void>() {
+ @Override
+ public boolean isSuccessful() {
+ return false;
+ }
+
+ @Override
+ public Void getResult() {
+ return null;
+ }
+
+ @Override
+ public Collection<RpcError> getErrors() {
+ // FIXME: wrap the exception
+ return Collections.emptySet();
+ }
+ };
+ }
}
@Override
package org.opendaylight.controller.sal.connect.netconf;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.common.RpcResult;
}
device.logger.trace("Loading YANG schema source for {}:{}", moduleName, revision);
- RpcResult<CompositeNode> schemaReply = device.invokeRpc(GET_SCHEMA_QNAME, request.toInstance());
- if (schemaReply.isSuccessful()) {
- String schemaBody = getSchemaFromRpc(schemaReply.getResult());
- if (schemaBody != null) {
- device.logger.trace("YANG Schema successfully retrieved from remote for {}:{}", moduleName, revision);
- return Optional.of(schemaBody);
+ try {
+ RpcResult<CompositeNode> schemaReply = device.invokeRpc(GET_SCHEMA_QNAME, request.toInstance()).get();
+ if (schemaReply.isSuccessful()) {
+ String schemaBody = getSchemaFromRpc(schemaReply.getResult());
+ if (schemaBody != null) {
+ device.logger.trace("YANG Schema successfully retrieved from remote for {}:{}", moduleName, revision);
+ return Optional.of(schemaBody);
+ }
}
+ device.logger.warn("YANG shcema was not successfully retrieved.");
+ } catch (InterruptedException | ExecutionException e) {
+ device.logger.warn("YANG shcema was not successfully retrieved.", e);
}
- device.logger.warn("YANG shcema was not successfully retrieved.");
return Optional.absent();
}
package org.opendaylight.controller.sal.connector.remoterpc;
-import com.google.common.base.Optional;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+
import org.opendaylight.controller.sal.common.util.RpcErrors;
import org.opendaylight.controller.sal.common.util.Rpcs;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.slf4j.LoggerFactory;
import org.zeromq.ZMQ;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
/**
* An implementation of {@link RpcImplementation} that makes
private final Logger _logger = LoggerFactory.getLogger(ClientImpl.class);
- private ZMQ.Context context = ZMQ.context(1);
- private ClientRequestHandler handler;
+ private final ZMQ.Context context = ZMQ.context(1);
+ private final ClientRequestHandler handler;
private RoutingTableProvider routingTableProvider;
public ClientImpl(){
return routingTableProvider;
}
+ @Override
public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
this.routingTableProvider = routingTableProvider;
}
* @param input payload for the remote service
* @return
*/
- public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+ public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
RouteIdentifierImpl routeId = new RouteIdentifierImpl();
routeId.setType(rpc);
* payload
* @return
*/
- public RpcResult<CompositeNode> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
+ public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
RouteIdentifierImpl routeId = new RouteIdentifierImpl();
routeId.setType(rpc);
return sendMessage(input, routeId, address);
}
- private RpcResult<CompositeNode> sendMessage(CompositeNode input, RouteIdentifierImpl routeId, String address) {
+ private ListenableFuture<RpcResult<CompositeNode>> sendMessage(CompositeNode input, RouteIdentifierImpl routeId, String address) {
Message request = new Message.MessageBuilder()
.type(Message.MessageType.REQUEST)
.sender(Context.getInstance().getLocalUri())
}
}
- return Rpcs.getRpcResult(true, payload, errors);
+ return Futures.immediateFuture(Rpcs.getRpcResult(true, payload, errors));
} catch (Exception e){
collectErrors(e, errors);
- return Rpcs.getRpcResult(false, null, errors);
+ return Futures.immediateFuture(Rpcs.<CompositeNode>getRpcResult(false, null, errors));
}
}
package org.opendaylight.controller.sal.connector.remoterpc;
-import com.google.common.base.Optional;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ListenableFuture;
public class RemoteRpcProvider implements
RpcImplementation,
AutoCloseable,
Provider {
- private Logger _logger = LoggerFactory.getLogger(RemoteRpcProvider.class);
+ private final Logger _logger = LoggerFactory.getLogger(RemoteRpcProvider.class);
private final ServerImpl server;
private final ClientImpl client;
}
@Override
- public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+ public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
return client.invokeRpc(rpc, input);
}
@Override
- public RpcResult<CompositeNode> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
+ public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
return client.invokeRpc(rpc, identifier, input);
}
return routeIdSet;
}
-
-
}
-
}
*/
package org.opendaylight.controller.sal.connector.remoterpc;
-import com.google.common.base.Optional;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+
import junit.framework.Assert;
+
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.opendaylight.controller.sal.common.util.Rpcs;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener;
import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
-import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
-import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
import org.opendaylight.controller.sal.connector.remoterpc.utils.MessagingUtil;
-import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import java.io.IOException;
-import java.util.*;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import com.google.common.base.Optional;
/**
*
when(mockHandler.handle(any(Message.class))).
thenReturn(MessagingUtil.createEmptyMessage());
- RpcResult<CompositeNode> result = client.invokeRpc(null, null);
+ RpcResult<CompositeNode> result = client.invokeRpc(null, null).get();
Assert.assertTrue(result.isSuccessful());
Assert.assertTrue(result.getErrors().isEmpty());
when(mockHandler.handle(any(Message.class))).
thenThrow(new IOException());
- RpcResult<CompositeNode> result = client.invokeRpc(null, null);
+ RpcResult<CompositeNode> result = client.invokeRpc(null, null).get();
Assert.assertFalse(result.isSuccessful());
Assert.assertFalse(result.getErrors().isEmpty());