- private void invokeRemoteRpc(final InvokeRpc msg) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Looking up the remote actor for rpc {}", msg.getRpc());
- }
- final RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(
- null, msg.getRpc(), msg.getIdentifier());
- final RpcRegistry.Messages.FindRouters findMsg = new RpcRegistry.Messages.FindRouters(routeId);
-
- final scala.concurrent.Future<Object> future = ask(rpcRegistry, findMsg, config.getAskDuration());
-
- final ActorRef sender = getSender();
- final ActorRef self = self();
-
- final OnComplete<Object> onComplete = new OnComplete<Object>() {
- @Override
- public void onComplete(final Throwable failure, final Object reply) throws Throwable {
- if(failure != null) {
- LOG.error("FindRouters failed", failure);
- sender.tell(new akka.actor.Status.Failure(failure), self);
- return;
- }
-
- final RpcRegistry.Messages.FindRoutersReply findReply =
- (RpcRegistry.Messages.FindRoutersReply)reply;
-
- final List<Pair<ActorRef, Long>> actorRefList = findReply.getRouterWithUpdateTime();
-
- if(actorRefList == null || actorRefList.isEmpty()) {
- sender.tell(new akka.actor.Status.Failure(new DOMRpcImplementationNotAvailableException(
- "No remote implementation available for rpc %s", msg.getRpc())), self);
- return;
- }
- finishInvokeRpc(actorRefList, msg, sender, self);
- }
- };
-
- future.onComplete(onComplete, getContext().dispatcher());
- }
-
- protected void finishInvokeRpc(final List<Pair<ActorRef, Long>> actorRefList,
- final InvokeRpc msg, final ActorRef sender, final ActorRef self) {
-
- final RoutingLogic logic = new LatestEntryRoutingLogic(actorRefList);
-
- final Node serializedNode = NormalizedNodeSerializer.serialize(msg.getInput());
- final ExecuteRpc executeMsg = new ExecuteRpc(serializedNode, msg.getRpc());
-
- final scala.concurrent.Future<Object> future = ask(logic.select(), executeMsg, config.getAskDuration());
-
- final OnComplete<Object> onComplete = new OnComplete<Object>() {
- @Override
- public void onComplete(final Throwable failure, final Object reply) throws Throwable {
- if(failure != null) {
- LOG.error("ExecuteRpc failed", failure);
- sender.tell(new akka.actor.Status.Failure(failure), self);
- return;
- }
-
- LOG.debug("Execute Rpc response received for rpc : {}, responding to sender : {}", msg.getRpc(), sender);
-
- sender.tell(reply, self);
- }
- };
-
- future.onComplete(onComplete, getContext().dispatcher());
- }
-