import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSet.Builder;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
public class BindingIndependentConnector implements //
RuntimeDataProvider, //
}
}
+
@Override
- public RpcResult<CompositeNode> invokeRpc(final QName rpc, final CompositeNode domInput) {
+ public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(final QName rpc, final CompositeNode domInput) {
checkArgument(rpc != null);
checkArgument(domInput != null);
RpcService rpcService = baRpcRegistry.getRpcService(rpcType);
checkState(rpcService != null);
CompositeNode domUnwrappedInput = domInput.getFirstCompositeByName(QName.create(rpc, "input"));
+
try {
- return resolveInvocationStrategy(rpc).invokeOn(rpcService, domUnwrappedInput);
+ return Futures.immediateFuture(resolveInvocationStrategy(rpc).invokeOn(rpcService, domUnwrappedInput));
} catch (Exception e) {
- throw new IllegalStateException(e);
+ return Futures.immediateFailedFuture(e);
}
}
}
@Override
- public Future<RpcResult<?>> forwardToDomBroker(final DataObject input) {
- if(biRpcRegistry != null) {
- CompositeNode xml = mappingService.toDataDom(input);
- CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc, ImmutableList.<Node<?>> of(xml));
- RpcResult<CompositeNode> result = biRpcRegistry.invokeRpc(rpc, wrappedXml);
- Object baResultValue = null;
- if (result.getResult() != null) {
- baResultValue = mappingService.dataObjectFromDataDom(outputClass.get(), result.getResult());
- }
- RpcResult<?> baResult = Rpcs.getRpcResult(result.isSuccessful(), baResultValue, result.getErrors());
- return Futures.<RpcResult<?>> immediateFuture(baResult);
+ public ListenableFuture<RpcResult<?>> forwardToDomBroker(final DataObject input) {
+ if(biRpcRegistry == null) {
+ return Futures.<RpcResult<?>> immediateFuture(Rpcs.getRpcResult(false));
}
- return Futures.<RpcResult<?>> immediateFuture(Rpcs.getRpcResult(false));
- }
+ CompositeNode xml = mappingService.toDataDom(input);
+ CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc, ImmutableList.<Node<?>> of(xml));
+
+ return Futures.transform(biRpcRegistry.invokeRpc(rpc, wrappedXml), new Function<RpcResult<CompositeNode>, RpcResult<?>>() {
+ @Override
+ public RpcResult<?> apply(RpcResult<CompositeNode> input) {
+ Object baResultValue = null;
+ if (input.getResult() != null) {
+ baResultValue = mappingService.dataObjectFromDataDom(outputClass.get(), input.getResult());
+ }
+ return Rpcs.getRpcResult(input.isSuccessful(), baResultValue, input.getErrors());
+ }
+ });
+ }
}
private class NoInputNoOutputInvocationStrategy extends RpcInvocationStrategy {
}
@Override
- public Future<RpcResult<?>> forwardToDomBroker(final DataObject input) {
- if(biRpcRegistry != null) {
- CompositeNode xml = mappingService.toDataDom(input);
- CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc,ImmutableList.<Node<?>>of(xml));
- RpcResult<CompositeNode> result = biRpcRegistry.invokeRpc(rpc, wrappedXml);
- Object baResultValue = null;
- RpcResult<?> baResult = Rpcs.<Void>getRpcResult(result.isSuccessful(), null, result.getErrors());
- return Futures.<RpcResult<?>>immediateFuture(baResult);
+ public ListenableFuture<RpcResult<?>> forwardToDomBroker(final DataObject input) {
+ if(biRpcRegistry == null) {
+ return Futures.<RpcResult<?>> immediateFuture(Rpcs.getRpcResult(false));
}
- return Futures.<RpcResult<?>>immediateFuture(Rpcs.getRpcResult(false));
- }
+ CompositeNode xml = mappingService.toDataDom(input);
+ CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc,ImmutableList.<Node<?>>of(xml));
+
+ return Futures.transform(biRpcRegistry.invokeRpc(rpc, wrappedXml), new Function<RpcResult<CompositeNode>, RpcResult<?>>() {
+ @Override
+ public RpcResult<?> apply(RpcResult<CompositeNode> input) {
+ return Rpcs.<Void>getRpcResult(input.isSuccessful(), null, input.getErrors());
+ }
+ });
+ }
}
public boolean isRpcForwarding() {
private void startDomBroker() {
checkState(executor != null);
biBrokerImpl = new BrokerImpl();
- biBrokerImpl.setExecutor(executor);
biBrokerImpl.setRouter(new SchemaAwareRpcBroker("/", this));
}
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
package org.opendaylight.controller.sal.core.api;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import com.google.common.util.concurrent.ListenableFuture;
+
public interface RoutedRpcDefaultImplementation {
- public RpcResult<CompositeNode> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input);
+ ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input);
}
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import com.google.common.util.concurrent.ListenableFuture;
+
/**
* {@link Provider}'s implementation of an RPC.
*
* {@link RpcResult}
* <li> {@link Broker} returns the {@link RpcResult} to {@link Consumer}
* </ol>
- *
- *
*/
public interface RpcImplementation extends Provider.ProviderFunctionality {
Set<QName> getSupportedRpcs();
/**
- * Invokes a implementation of specified rpc.
- *
+ * Invokes a implementation of specified RPC asynchronously.
*
* @param rpc
- * Rpc to be invoked
+ * RPC to be invoked
* @param input
- * Input data for rpc.
+ * Input data for the RPC.
*
* @throws IllegalArgumentException
* <ul>
* <li>If input is not <code>null</code> and
* <code>false == rpc.equals(input.getNodeType)</code>
* </ul>
- * @return RpcResult containing the output of rpc if was executed
- * successfully, the list of errors otherwise.
+ * @return Future promising an RpcResult containing the output of
+ * the RPC if was executed successfully, the list of errors
+ * otherwise.
*/
- RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input);
-
+ ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input);
}
*/
package org.opendaylight.controller.sal.dom.broker;
+import com.google.common.util.concurrent.ListenableFuture
import java.util.Collections
import java.util.HashSet
import java.util.Set
-import java.util.concurrent.Callable
-import java.util.concurrent.ExecutorService
-import java.util.concurrent.Executors
-import java.util.concurrent.Future
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener
import org.opendaylight.controller.sal.core.api.Broker
import org.opendaylight.controller.sal.core.api.Consumer
import org.opendaylight.controller.sal.core.api.Provider
import org.opendaylight.controller.sal.core.api.RpcRegistrationListener
import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry
import org.opendaylight.controller.sal.core.api.RpcImplementation
-import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener
import org.opendaylight.controller.sal.core.api.RpcRoutingContext
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation
private val Set<ProviderContextImpl> providerSessions = Collections.synchronizedSet(
new HashSet<ProviderContextImpl>());
- // Implementation specific
- @Property
- private var ExecutorService executor = Executors.newFixedThreadPool(5);
@Property
private var BundleContext bundleContext;
-
+
@Property
private var AutoCloseable deactivator;
return session;
}
- protected def Future<RpcResult<CompositeNode>> invokeRpcAsync(QName rpc, CompositeNode input) {
- val result = executor.submit([|router.invokeRpc(rpc, input)] as Callable<RpcResult<CompositeNode>>);
- return result;
+ protected def ListenableFuture<RpcResult<CompositeNode>> invokeRpcAsync(QName rpc, CompositeNode input) {
+ return router.invokeRpc(rpc, input);
}
// Validation
sessions.remove(consumerContextImpl);
providerSessions.remove(consumerContextImpl);
}
-
+
override close() throws Exception {
deactivator?.close();
}
-
+
override addRpcImplementation(QName rpcType, RpcImplementation implementation) throws IllegalArgumentException {
router.addRpcImplementation(rpcType,implementation);
}
-
+
override addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
router.addRoutedRpcImplementation(rpcType,implementation);
}
override addRpcRegistrationListener(RpcRegistrationListener listener) {
return router.addRpcRegistrationListener(listener);
}
-
+
override <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> registerRouteChangeListener(L listener) {
return router.registerRouteChangeListener(listener);
}
- override invokeRpc(QName rpc,CompositeNode input){
- return router.invokeRpc(rpc,input)
- }
-
override getSupportedRpcs() {
return router.getSupportedRpcs();
}
-
+
+ override invokeRpc(QName rpc, CompositeNode input) {
+ return router.invokeRpc(rpc,input)
+ }
+
}
return rpcs.getSupportedRpcs();
}
-
@Override
- public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+ public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
return rpcs.invokeRpc(rpc, input);
}
return rpcs.addRpcRegistrationListener(listener);
}
-
@Override
public ListenableFuture<RpcResult<CompositeNode>> rpc(QName type, CompositeNode input) {
return null;
L listener) {
return rpcs.registerRouteChangeListener(listener);
}
-
-
}
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.ListenableFuture;
public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, RoutedRpcDefaultImplementation {
this.schemaProvider = schemaProvider;
}
- public RoutedRpcDefaultImplementation getRoutedRpcDefaultDelegate() {
- return defaultDelegate;
- }
+ public RoutedRpcDefaultImplementation getRoutedRpcDefaultDelegate() {
+ return defaultDelegate;
+ }
@Override
- public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultDelegate) {
- this.defaultDelegate = defaultDelegate;
- }
+ public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultDelegate) {
+ this.defaultDelegate = defaultDelegate;
+ }
- @Override
+ @Override
public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
checkArgument(rpcType != null, "RPC Type should not be null");
checkArgument(implementation != null, "RPC Implementatoin should not be null");
}
@Override
- public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+ public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
return findRpcImplemention(rpc).invokeRpc(rpc, input);
}
}
@Override
- public RpcResult<CompositeNode> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
+ public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
checkState(defaultDelegate != null);
return defaultDelegate.invokeRpc(rpc, identifier, input);
}
}
@Override
- public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+ public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
CompositeNode inputContainer = input.getFirstCompositeByName(QName.create(rpc,"input"));
checkArgument(inputContainer != null, "Rpc payload must contain input element");
SimpleNode<?> routeContainer = inputContainer.getFirstSimpleByName(strategy.getLeaf());
package org.opendaylight.controller.sal.dom.broker.osgi;
+import java.util.Set;
+
import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
import org.opendaylight.controller.sal.core.api.*;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.osgi.framework.ServiceReference;
-import java.util.Set;
+import com.google.common.util.concurrent.ListenableFuture;
public class RpcProvisionRegistryProxy extends AbstractBrokerServiceProxy<RpcProvisionRegistry>
implements RpcProvisionRegistry {
return getDelegate().addRoutedRpcImplementation(rpcType, implementation);
}
- @Override
- public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultImplementation) {
- getDelegate().setRoutedRpcDefaultDelegate(defaultImplementation);
- }
+ @Override
+ public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultImplementation) {
+ getDelegate().setRoutedRpcDefaultDelegate(defaultImplementation);
+ }
- @Override
+ @Override
public <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> ListenerRegistration<L> registerRouteChangeListener(L listener) {
return getDelegate().registerRouteChangeListener(listener);
}
+ @Override
+ public Set<QName> getSupportedRpcs() {
+ return getDelegate().getSupportedRpcs();
+ }
- @Override
- public Set<QName> getSupportedRpcs() {
- return getDelegate().getSupportedRpcs();
- }
-
- @Override
- public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
- return getDelegate().invokeRpc(rpc,input);
- }
+ @Override
+ public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
+ return getDelegate().invokeRpc(rpc, input);
+ }
}
package org.opendaylight.controller.sal.dom.broker.spi;
import java.util.Map;
-import java.util.Set;
import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
import org.opendaylight.controller.sal.core.api.RpcImplementation;
import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
public interface RoutedRpcProcessor extends RpcImplementation {
- public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation);
+ RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation);
- public Set<QName> getSupportedRpcs();
-
- public QName getRpcType();
-
- public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input);
+ QName getRpcType();
Map<InstanceIdentifier,RpcImplementation> getRoutes();
-
+
RpcImplementation getDefaultRoute();
}
import org.opendaylight.controller.sal.core.api.RpcImplementation;
import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
public interface RpcRouter extends RpcProvisionRegistry, RpcImplementation {
@Override
public Set<QName> getSupportedRpcs();
-
- @Override
- public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input);
}