1 package org.opendaylight.controller.remote.rpc;
3 import static akka.pattern.Patterns.ask;
4 import akka.actor.ActorRef;
5 import akka.dispatch.OnComplete;
6 import akka.util.Timeout;
8 import com.google.common.util.concurrent.ListenableFuture;
9 import com.google.common.util.concurrent.SettableFuture;
11 import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
12 import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
13 import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
14 import org.opendaylight.controller.xml.codec.XmlUtils;
15 import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation;
16 import org.opendaylight.controller.sal.core.api.RpcImplementation;
17 import org.opendaylight.yangtools.yang.common.QName;
18 import org.opendaylight.yangtools.yang.common.RpcResult;
19 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
20 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
21 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
22 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
23 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
27 import scala.concurrent.ExecutionContext;
29 import java.util.Collections;
32 public class RemoteRpcImplementation implements RpcImplementation, RoutedRpcDefaultImplementation {
33 private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcImplementation.class);
34 private final ActorRef rpcBroker;
35 private final SchemaContext schemaContext;
37 public RemoteRpcImplementation(ActorRef rpcBroker, SchemaContext schemaContext) {
38 this.rpcBroker = rpcBroker;
39 this.schemaContext = schemaContext;
43 public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc,
44 YangInstanceIdentifier identifier, CompositeNode input) {
45 InvokeRpc rpcMsg = new InvokeRpc(rpc, identifier, input);
47 return executeMsg(rpcMsg);
51 public Set<QName> getSupportedRpcs() {
52 // TODO : check if we need to get this from routing registry
53 return Collections.emptySet();
57 public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
58 InvokeRpc rpcMsg = new InvokeRpc(rpc, null, input);
59 return executeMsg(rpcMsg);
62 private ListenableFuture<RpcResult<CompositeNode>> executeMsg(InvokeRpc rpcMsg) {
64 final SettableFuture<RpcResult<CompositeNode>> listenableFuture = SettableFuture.create();
66 scala.concurrent.Future<Object> future = ask(rpcBroker, rpcMsg,
67 new Timeout(ActorUtil.ASK_DURATION));
69 OnComplete<Object> onComplete = new OnComplete<Object>() {
71 public void onComplete(Throwable failure, Object reply) throws Throwable {
73 LOG.error("InvokeRpc failed", failure);
75 RpcResult<CompositeNode> rpcResult;
76 if(failure instanceof RpcErrorsException) {
77 rpcResult = RpcResultBuilder.<CompositeNode>failed().withRpcErrors(
78 ((RpcErrorsException)failure).getRpcErrors()).build();
80 rpcResult = RpcResultBuilder.<CompositeNode>failed().withError(
81 ErrorType.RPC, failure.getMessage(), failure).build();
84 listenableFuture.set(rpcResult);
88 RpcResponse rpcReply = (RpcResponse)reply;
89 CompositeNode result = XmlUtils.xmlToCompositeNode(rpcReply.getResultCompositeNode());
90 listenableFuture.set(RpcResultBuilder.success(result).build());
94 future.onComplete(onComplete, ExecutionContext.Implicits$.MODULE$.global());
96 return listenableFuture;