- if (input instanceof RemoteRpcInput) {
- LOG.warn("Rpc {} was removed during execution or there is loop present. Failing received rpc.", rpc);
- return Futures.<DOMRpcResult, DOMRpcException>immediateFailedCheckedFuture(
- new DOMRpcImplementationNotAvailableException(
- "Rpc implementation for %s was removed during processing.", rpc));
- }
- final RemoteDOMRpcFuture frontEndFuture = RemoteDOMRpcFuture.create(rpc.getType().getLastComponent());
- findRouteAsync(rpc).onComplete(new OnComplete<FindRoutersReply>() {
-
- @Override
- public void onComplete(final Throwable error, final FindRoutersReply routes) throws Throwable {
- if (error != null) {
- frontEndFuture.failNow(error);
- } else {
- final List<Pair<ActorRef, Long>> routePairs = routes.getRouterWithUpdateTime();
- if (routePairs == null || routePairs.isEmpty()) {
- frontEndFuture.failNow(new DOMRpcImplementationNotAvailableException(
- "No local or remote implementation available for rpc %s", rpc.getType()));
- } else {
- final ActorRef remoteImplRef = new LatestEntryRoutingLogic(routePairs).select();
- final Object executeRpcMessage = ExecuteRpc.from(rpc, input);
- LOG.debug("Found remote actor {} for rpc {} - sending {}", remoteImplRef, rpc.getType(),
- executeRpcMessage);
- frontEndFuture.completeWith(ask(remoteImplRef, executeRpcMessage, config.getAskDuration()));
- }
- }
- }
- }, ExecutionContext.Implicits$.MODULE$.global());
- return frontEndFuture;