+ private void executeRpc(final ExecuteRpc msg) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Executing rpc {}", msg.getRpc());
+ }
+ Future<RpcResult<CompositeNode>> future = brokerSession.rpc(msg.getRpc(),
+ XmlUtils.inputXmlToCompositeNode(msg.getRpc(), msg.getInputCompositeNode(),
+ schemaContext));
+
+ ListenableFuture<RpcResult<CompositeNode>> listenableFuture =
+ JdkFutureAdapters.listenInPoolThread(future);
+
+ final ActorRef sender = getSender();
+ final ActorRef self = self();
+
+ Futures.addCallback(listenableFuture, new FutureCallback<RpcResult<CompositeNode>>() {
+ @Override
+ public void onSuccess(RpcResult<CompositeNode> result) {
+ if(result.isSuccessful()) {
+ sender.tell(new RpcResponse(XmlUtils.outputCompositeNodeToXml(result.getResult(),
+ schemaContext)), self);
+ } else {
+ String message = String.format("Execution of RPC %s failed", msg.getRpc());
+ Collection<RpcError> errors = result.getErrors();
+ if(errors == null || errors.size() == 0) {
+ errors = Arrays.asList(RpcResultBuilder.newError(ErrorType.RPC,
+ null, message));
+ }
+
+ sender.tell(new akka.actor.Status.Failure(new RpcErrorsException(
+ message, errors)), self);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("executeRpc for {} failed: {}", msg.getRpc(), t);
+ sender.tell(new akka.actor.Status.Failure(t), self);
+ }
+ });
+ }
+
+ private static class RpcBrokerCreator implements Creator<RpcBroker> {
+ private static final long serialVersionUID = 1L;
+
+ final Broker.ProviderSession brokerSession;
+ final ActorRef rpcRegistry;
+ final SchemaContext schemaContext;
+
+ RpcBrokerCreator(ProviderSession brokerSession, ActorRef rpcRegistry,
+ SchemaContext schemaContext) {
+ this.brokerSession = brokerSession;
+ this.rpcRegistry = rpcRegistry;
+ this.schemaContext = schemaContext;
+ }
+
+ @Override
+ public RpcBroker create() throws Exception {
+ return new RpcBroker(brokerSession, rpcRegistry, schemaContext);
+ }
+ }