- private ListenableFuture<RpcResult<CompositeNode>> executeMsg(Object rpcMsg) {
- CompositeNode result = null;
- Collection<RpcError> errors = errors = new ArrayList<>();
- try {
- Object response = ActorUtil.executeLocalOperation(rpcBroker, rpcMsg, ActorUtil.ASK_DURATION, ActorUtil.AWAIT_DURATION);
- if(response instanceof RpcResponse) {
- RpcResponse rpcResponse = (RpcResponse) response;
- result = XmlUtils.xmlToCompositeNode(rpcResponse.getResultCompositeNode());
- } else if(response instanceof ErrorResponse) {
- ErrorResponse errorResponse = (ErrorResponse) response;
- Exception e = errorResponse.getException();
- errors.add(RpcErrors.getRpcError(null, null, null, null, e.getMessage(), null, e.getCause()));
- }
- } catch (Exception e) {
- LOG.error("Error occurred while invoking RPC actor {}", e.toString());
- errors.add(RpcErrors.getRpcError(null, null, null, null, e.getMessage(), null, e.getCause()));
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private Future<FindRoutersReply> findRouteAsync(final DOMRpcIdentifier rpc) {
+ // FIXME: Refactor routeId and message to use DOMRpcIdentifier directly.
+ final RpcRouter.RouteIdentifier<?, ?, ?> routeId =
+ new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), rpc.getContextReference());
+ final RpcRegistry.Messages.FindRouters findMsg = new RpcRegistry.Messages.FindRouters(routeId);
+ return (Future) ask(rpcRegistry, findMsg, config.getAskDuration());