package org.opendaylight.controller.remote.rpc; import static akka.pattern.Patterns.ask; import akka.actor.ActorRef; import akka.dispatch.OnComplete; import akka.util.Timeout; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import org.opendaylight.controller.remote.rpc.messages.InvokeRpc; import org.opendaylight.controller.remote.rpc.messages.RpcResponse; import org.opendaylight.controller.remote.rpc.utils.ActorUtil; import org.opendaylight.controller.xml.codec.XmlUtils; import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation; import org.opendaylight.controller.sal.core.api.RpcImplementation; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; import org.opendaylight.yangtools.yang.data.api.CompositeNode; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.ExecutionContext; import java.util.Collections; import java.util.Set; public class RemoteRpcImplementation implements RpcImplementation, RoutedRpcDefaultImplementation { private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcImplementation.class); private final ActorRef rpcBroker; private final SchemaContext schemaContext; public RemoteRpcImplementation(ActorRef rpcBroker, SchemaContext schemaContext) { this.rpcBroker = rpcBroker; this.schemaContext = schemaContext; } @Override public ListenableFuture> invokeRpc(QName rpc, YangInstanceIdentifier identifier, CompositeNode input) { InvokeRpc rpcMsg = new InvokeRpc(rpc, identifier, input); return executeMsg(rpcMsg); } @Override public Set getSupportedRpcs() { // TODO : check if we need to get this from routing registry return Collections.emptySet(); } @Override public ListenableFuture> invokeRpc(QName rpc, CompositeNode input) { InvokeRpc rpcMsg = new InvokeRpc(rpc, null, input); return executeMsg(rpcMsg); } private ListenableFuture> executeMsg(InvokeRpc rpcMsg) { final SettableFuture> listenableFuture = SettableFuture.create(); scala.concurrent.Future future = ask(rpcBroker, rpcMsg, new Timeout(ActorUtil.ASK_DURATION)); OnComplete onComplete = new OnComplete() { @Override public void onComplete(Throwable failure, Object reply) throws Throwable { if(failure != null) { LOG.error("InvokeRpc failed", failure); RpcResult rpcResult; if(failure instanceof RpcErrorsException) { rpcResult = RpcResultBuilder.failed().withRpcErrors( ((RpcErrorsException)failure).getRpcErrors()).build(); } else { rpcResult = RpcResultBuilder.failed().withError( ErrorType.RPC, failure.getMessage(), failure).build(); } listenableFuture.set(rpcResult); return; } RpcResponse rpcReply = (RpcResponse)reply; CompositeNode result = XmlUtils.xmlToCompositeNode(rpcReply.getResultCompositeNode()); listenableFuture.set(RpcResultBuilder.success(result).build()); } }; future.onComplete(onComplete, ExecutionContext.Implicits$.MODULE$.global()); return listenableFuture; } }