import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.ListenableFuture;
public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, RoutedRpcDefaultImplementation {
this.schemaProvider = schemaProvider;
}
- public RoutedRpcDefaultImplementation getRoutedRpcDefaultDelegate() {
- return defaultDelegate;
- }
+ public RoutedRpcDefaultImplementation getRoutedRpcDefaultDelegate() {
+ return defaultDelegate;
+ }
@Override
- public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultDelegate) {
- this.defaultDelegate = defaultDelegate;
- }
+ public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultDelegate) {
+ this.defaultDelegate = defaultDelegate;
+ }
- @Override
+ @Override
public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
checkArgument(rpcType != null, "RPC Type should not be null");
checkArgument(implementation != null, "RPC Implementatoin should not be null");
}
@Override
- public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+ public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
return findRpcImplemention(rpc).invokeRpc(rpc, input);
}
if (potentialImpl != null) {
return potentialImpl;
}
+
potentialImpl = defaultImplementation;
- checkState(potentialImpl != null, "Implementation is not available.");
+ if( potentialImpl == null ) {
+ throw new UnsupportedOperationException( "No implementation for this operation is available." );
+ }
+
return potentialImpl;
}
if (CONTEXT_REFERENCE.equals(extension.getNodeType())) {
return Optional.fromNullable(extension.getQName());
}
- ;
}
return Optional.absent();
}
}
@Override
- public RpcResult<CompositeNode> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
+ public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
checkState(defaultDelegate != null);
return defaultDelegate.invokeRpc(rpc, identifier, input);
}
}
@Override
- public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+ public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
CompositeNode inputContainer = input.getFirstCompositeByName(QName.create(rpc,"input"));
checkArgument(inputContainer != null, "Rpc payload must contain input element");
SimpleNode<?> routeContainer = inputContainer.getFirstSimpleByName(strategy.getLeaf());
checkArgument(routeContainer != null, "Leaf %s must be set with value", strategy.getLeaf());
Object route = routeContainer.getValue();
- checkArgument(route instanceof InstanceIdentifier);
+ checkArgument(route instanceof InstanceIdentifier,
+ "The routed node %s is not an instance identifier", route);
RpcImplementation potential = null;
if (route != null) {
RoutedRpcRegImpl potentialReg = implementations.get(route);
try {
listener.onRouteChange(initial);
} catch (Exception e) {
- LOG.error("Unhandled exception during sending initial route change event {} to {}",initial,listener);
+ LOG.error("Unhandled exception during sending initial route change event {} to {}",initial,listener, e);
}
return reg;
}