+ private void invokeRemoteRpc(final InvokeRpc msg) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Looking up the remote actor for rpc {}", msg.getRpc());
+ }
+ RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(
+ null, msg.getRpc(), msg.getIdentifier());
+ RpcRegistry.Messages.FindRouters findMsg = new RpcRegistry.Messages.FindRouters(routeId);
+
+ scala.concurrent.Future<Object> future = ask(rpcRegistry, findMsg, config.getAskDuration());
+
+ final ActorRef sender = getSender();
+ final ActorRef self = self();
+
+ OnComplete<Object> onComplete = new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object reply) throws Throwable {
+ if(failure != null) {
+ LOG.error("FindRouters failed", failure);
+ sender.tell(new akka.actor.Status.Failure(failure), self);
+ return;
+ }
+
+ RpcRegistry.Messages.FindRoutersReply findReply =
+ (RpcRegistry.Messages.FindRoutersReply)reply;
+
+ List<Pair<ActorRef, Long>> actorRefList = findReply.getRouterWithUpdateTime();
+
+ if(actorRefList == null || actorRefList.isEmpty()) {
+ String message = String.format(
+ "No remote implementation found for rpc %s", msg.getRpc());
+ sender.tell(new akka.actor.Status.Failure(new RpcErrorsException(
+ message, Arrays.asList(RpcResultBuilder.newError(ErrorType.RPC,
+ "operation-not-supported", message)))), self);
+ return;
+ }
+
+ finishInvokeRpc(actorRefList, msg, sender, self);
+ }
+ };
+
+ future.onComplete(onComplete, getContext().dispatcher());
+ }
+
+ protected void finishInvokeRpc(final List<Pair<ActorRef, Long>> actorRefList,
+ final InvokeRpc msg, final ActorRef sender, final ActorRef self) {
+
+ RoutingLogic logic = new LatestEntryRoutingLogic(actorRefList);
+
+ ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(),
+ schemaContext), msg.getRpc());
+
+ scala.concurrent.Future<Object> future = ask(logic.select(), executeMsg, config.getAskDuration());
+
+ OnComplete<Object> onComplete = new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object reply) throws Throwable {
+ if(failure != null) {
+ LOG.error("ExecuteRpc failed", failure);
+ sender.tell(new akka.actor.Status.Failure(failure), self);
+ return;
+ }