- public Set<QName> getSupportedRpcs() {
- // TODO : check if we need to get this from routing registry
- return Collections.emptySet();
- }
-
- @Override
- public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
- InvokeRpc rpcMsg = new InvokeRpc(rpc, null, input);
- return executeMsg(rpcMsg);
- }
-
- private ListenableFuture<RpcResult<CompositeNode>> executeMsg(InvokeRpc rpcMsg) {
-
- final SettableFuture<RpcResult<CompositeNode>> listenableFuture = SettableFuture.create();
-
- scala.concurrent.Future<Object> future = ask(rpcBroker, rpcMsg, config.getAskDuration());
-
- OnComplete<Object> onComplete = new OnComplete<Object>() {
- @Override
- public void onComplete(Throwable failure, Object reply) throws Throwable {
- if(failure != null) {
- LOG.error("InvokeRpc failed", failure);
-
- RpcResult<CompositeNode> rpcResult;
- if(failure instanceof RpcErrorsException) {
- rpcResult = RpcResultBuilder.<CompositeNode>failed().withRpcErrors(
- ((RpcErrorsException)failure).getRpcErrors()).build();
- } else {
- rpcResult = RpcResultBuilder.<CompositeNode>failed().withError(
- ErrorType.RPC, failure.getMessage(), failure).build();
- }
-
- listenableFuture.set(rpcResult);
- return;
- }
-
- RpcResponse rpcReply = (RpcResponse)reply;
- CompositeNode result = XmlUtils.xmlToCompositeNode(rpcReply.getResultCompositeNode());
- listenableFuture.set(RpcResultBuilder.success(result).build());
- }
- };
-
- future.onComplete(onComplete, ExecutionContext.Implicits$.MODULE$.global());
-
- return listenableFuture;