+ protected void finishInvokeRpc(final List<Pair<ActorRef, Long>> actorRefList,
+ final InvokeRpc msg, final ActorRef sender, final ActorRef self) {
+
+ final RoutingLogic logic = new LatestEntryRoutingLogic(actorRefList);
+
+ final Node serializedNode = NormalizedNodeSerializer.serialize(msg.getInput());
+ final ExecuteRpc executeMsg = new ExecuteRpc(serializedNode, msg.getRpc());
+
+ final scala.concurrent.Future<Object> future = ask(logic.select(), executeMsg, config.getAskDuration());
+
+ final OnComplete<Object> onComplete = new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable failure, final Object reply) throws Throwable {
+ if(failure != null) {
+ LOG.error("ExecuteRpc failed", failure);
+ sender.tell(new akka.actor.Status.Failure(failure), self);
+ return;
+ }
+
+ sender.tell(reply, self);
+ }
+ };
+
+ future.onComplete(onComplete, getContext().dispatcher());
+ }
+
+ private void executeRpc(final ExecuteRpc msg) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Executing rpc {}", msg.getRpc());
+ }
+ final NormalizedNode<?, ?> input = NormalizedNodeSerializer.deSerialize(msg.getInputNormalizedNode());
+ final SchemaPath schemaPath = SchemaPath.create(true, msg.getRpc());
+
+ final CheckedFuture<DOMRpcResult, DOMRpcException> future = rpcService.invokeRpc(schemaPath, input);
+
+ final ListenableFuture<DOMRpcResult> listenableFuture =
+ JdkFutureAdapters.listenInPoolThread(future);
+
+ final ActorRef sender = getSender();
+ final ActorRef self = self();
+
+ Futures.addCallback(listenableFuture, new FutureCallback<DOMRpcResult>() {
+ @Override
+ public void onSuccess(final DOMRpcResult result) {
+ if (result.getErrors() != null && ( ! result.getErrors().isEmpty())) {
+ final String message = String.format("Execution of RPC %s failed", msg.getRpc());
+ Collection<RpcError> errors = result.getErrors();
+ if(errors == null || errors.size() == 0) {
+ errors = Arrays.asList(RpcResultBuilder.newError(ErrorType.RPC,
+ null, message));
+ }
+
+ sender.tell(new akka.actor.Status.Failure(new RpcErrorsException(
+ message, errors)), self);
+ } else {
+ final Node serializedResultNode = NormalizedNodeSerializer.serialize(result.getResult());
+ sender.tell(new RpcResponse(serializedResultNode), self);
+ }
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ LOG.error("executeRpc for {} failed: {}", msg.getRpc(), t);
+ sender.tell(new akka.actor.Status.Failure(t), self);
+ }
+ });
+ }
+
+ private static class RpcBrokerCreator implements Creator<RpcBroker> {
+ private static final long serialVersionUID = 1L;
+
+ final DOMRpcService rpcService;
+ final ActorRef rpcRegistry;
+
+ RpcBrokerCreator(final DOMRpcService rpcService, final ActorRef rpcRegistry) {
+ this.rpcService = rpcService;
+ this.rpcRegistry = rpcRegistry;
+ }
+
+ @Override
+ public RpcBroker create() throws Exception {
+ return new RpcBroker(rpcService, rpcRegistry);
+ }
+ }