- }
-
- private void invokeRemoteRpc(InvokeRpc msg) {
- // Look up the remote actor to execute rpc
- LOG.debug("Looking up the remote actor for route {}", msg);
- try {
- // Find router
- RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(null, msg.getRpc(), msg.getIdentifier());
- RpcRegistry.Messages.FindRouters rpcMsg = new RpcRegistry.Messages.FindRouters(routeId);
- RpcRegistry.Messages.FindRoutersReply rpcReply =
- (RpcRegistry.Messages.FindRoutersReply) ActorUtil.executeOperation(rpcRegistry, rpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION);
-
- List<Pair<ActorRef, Long>> actorRefList = rpcReply.getRouterWithUpdateTime();
-
- if(actorRefList == null || actorRefList.isEmpty()) {
- LOG.debug("No remote actor found for rpc {{}}.", msg.getRpc());
-
- getSender().tell(new ErrorResponse(
- new IllegalStateException("No remote actor found for rpc execution of : " + msg.getRpc())), self());
- } else {
- RoutingLogic logic = new LatestEntryRoutingLogic(actorRefList);
-
- ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(), schemaContext), msg.getRpc());
- Object operationRes = ActorUtil.executeOperation(logic.select(),
- executeMsg, ActorUtil.REMOTE_ASK_DURATION, ActorUtil.REMOTE_AWAIT_DURATION);
-
- getSender().tell(operationRes, self());
- }
- } catch (Exception e) {
- LOG.error("invokeRemoteRpc: {}", e);
- getSender().tell(new ErrorResponse(e), self());
+
+ public static Props props(final DOMRpcService rpcService, final ActorRef rpcRegistry) {
+ Preconditions.checkNotNull(rpcRegistry, "ActorRef can not be null!");
+ Preconditions.checkNotNull(rpcService, "DOMRpcService can not be null");
+ return Props.create(new RpcBrokerCreator(rpcService, rpcRegistry));
+ }
+
+ @Override
+ protected void handleReceive(final Object message) throws Exception {
+ if(message instanceof InvokeRpc) {
+ invokeRemoteRpc((InvokeRpc) message);
+ } else if(message instanceof ExecuteRpc) {
+ executeRpc((ExecuteRpc) message);
+ }
+ }
+
+ private void invokeRemoteRpc(final InvokeRpc msg) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Looking up the remote actor for rpc {}", msg.getRpc());
+ }
+ final RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(
+ null, msg.getRpc(), msg.getIdentifier());
+ final RpcRegistry.Messages.FindRouters findMsg = new RpcRegistry.Messages.FindRouters(routeId);
+
+ final scala.concurrent.Future<Object> future = ask(rpcRegistry, findMsg, config.getAskDuration());
+
+ final ActorRef sender = getSender();
+ final ActorRef self = self();
+
+ final OnComplete<Object> onComplete = new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable failure, final Object reply) throws Throwable {
+ if(failure != null) {
+ LOG.error("FindRouters failed", failure);
+ sender.tell(new akka.actor.Status.Failure(failure), self);
+ return;
+ }
+
+ final RpcRegistry.Messages.FindRoutersReply findReply =
+ (RpcRegistry.Messages.FindRoutersReply)reply;
+
+ final List<Pair<ActorRef, Long>> actorRefList = findReply.getRouterWithUpdateTime();
+
+ if(actorRefList == null || actorRefList.isEmpty()) {
+ final String message = String.format(
+ "No remote implementation found for rpc %s", msg.getRpc());
+ sender.tell(new akka.actor.Status.Failure(new RpcErrorsException(
+ message, Arrays.asList(RpcResultBuilder.newError(ErrorType.RPC,
+ "operation-not-supported", message)))), self);
+ return;
+ }
+
+ finishInvokeRpc(actorRefList, msg, sender, self);
+ }
+ };
+
+ future.onComplete(onComplete, getContext().dispatcher());