From 7f8512fcbe4ac373995b7e2e370d38a01f4eaeec Mon Sep 17 00:00:00 2001 From: tpantelis Date: Sun, 24 Aug 2014 20:29:58 -0400 Subject: [PATCH] Bug 1637: Change Rpc actor calls to async Changed RemoteRpcImplementation and RpcBroker to use async calls and modified/created unit tests. Added an RpcErrorsException to transfer RpcError info on failure. This is now used in lieu of ErrorResponse to communicate failures. Change-Id: Ib0bbda1867caff9b6c584b9b8a9b336f8acf53d7 Signed-off-by: tpantelis --- .../controller/xml/codec/XmlUtils.java | 34 +- .../remote/rpc/RemoteRpcImplementation.java | 133 ++++--- .../controller/remote/rpc/RpcBroker.java | 230 +++++++---- .../remote/rpc/RpcErrorsException.java | 78 ++++ .../remote/rpc/messages/ErrorResponse.java | 26 -- .../remote/rpc/utils/ActorUtil.java | 29 -- .../remote/rpc/AbstractRpcTest.java | 175 +++++++++ .../rpc/RemoteRpcImplementationTest.java | 185 +++++++++ .../controller/remote/rpc/RpcBrokerTest.java | 370 ++++++++++++------ .../src/test/resources/test-rpc.yang | 24 ++ 10 files changed, 968 insertions(+), 316 deletions(-) create mode 100644 opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcErrorsException.java delete mode 100644 opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ErrorResponse.java create mode 100644 opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractRpcTest.java create mode 100644 opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementationTest.java create mode 100644 opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/test-rpc.yang diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/xml/codec/XmlUtils.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/xml/codec/XmlUtils.java index 5848561676..ea8f4a3ef1 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/xml/codec/XmlUtils.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/xml/codec/XmlUtils.java @@ -75,9 +75,13 @@ public class XmlUtils { */ public static String inputCompositeNodeToXml(CompositeNode cNode, SchemaContext schemaContext){ LOG.debug("Converting input composite node to xml {}", cNode); - if (cNode == null) return BLANK; + if (cNode == null) { + return BLANK; + } - if(schemaContext == null) return BLANK; + if(schemaContext == null) { + return BLANK; + } Document domTree = null; try { @@ -108,9 +112,13 @@ public class XmlUtils { */ public static String outputCompositeNodeToXml(CompositeNode cNode, SchemaContext schemaContext){ LOG.debug("Converting output composite node to xml {}", cNode); - if (cNode == null) return BLANK; + if (cNode == null) { + return BLANK; + } - if(schemaContext == null) return BLANK; + if(schemaContext == null) { + return BLANK; + } Document domTree = null; try { @@ -150,7 +158,9 @@ public class XmlUtils { } public static CompositeNode xmlToCompositeNode(String xml){ - if (xml==null || xml.length()==0) return null; + if (xml==null || xml.length()==0) { + return null; + } Node dataTree; try { @@ -179,11 +189,17 @@ public class XmlUtils { */ public static CompositeNode inputXmlToCompositeNode(QName rpc, String xml, SchemaContext schemaContext){ LOG.debug("Converting input xml to composite node {}", xml); - if (xml==null || xml.length()==0) return null; + if (xml==null || xml.length()==0) { + return null; + } - if(rpc == null) return null; + if(rpc == null) { + return null; + } - if(schemaContext == null) return null; + if(schemaContext == null) { + return null; + } CompositeNode compositeNode = null; try { @@ -213,7 +229,7 @@ public class XmlUtils { LOG.debug("Converted xml input to list of nodes {}", dataNodes); final CompositeNodeBuilder it = ImmutableCompositeNode.builder(); - it.setQName(input); + it.setQName(rpc); it.add(ImmutableCompositeNode.create(input, dataNodes)); compositeNode = it.toInstance(); break; diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java index 4496bd3263..7d7dbf0f3a 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java @@ -1,9 +1,13 @@ package org.opendaylight.controller.remote.rpc; +import static akka.pattern.Patterns.ask; import akka.actor.ActorRef; -import com.google.common.util.concurrent.Futures; +import akka.dispatch.OnComplete; +import akka.util.Timeout; + import com.google.common.util.concurrent.ListenableFuture; -import org.opendaylight.controller.remote.rpc.messages.ErrorResponse; +import com.google.common.util.concurrent.SettableFuture; + import org.opendaylight.controller.remote.rpc.messages.InvokeRpc; import org.opendaylight.controller.remote.rpc.messages.RpcResponse; import org.opendaylight.controller.remote.rpc.utils.ActorUtil; @@ -13,73 +17,82 @@ import org.opendaylight.controller.sal.core.api.RpcImplementation; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; +import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; import org.opendaylight.yangtools.yang.data.api.CompositeNode; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.ExecutionContext; + import java.util.Collections; import java.util.Set; -public class RemoteRpcImplementation implements RpcImplementation, - RoutedRpcDefaultImplementation { - private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcImplementation.class); - private ActorRef rpcBroker; - private SchemaContext schemaContext; - - public RemoteRpcImplementation(ActorRef rpcBroker, SchemaContext schemaContext) { - this.rpcBroker = rpcBroker; - this.schemaContext = schemaContext; - } - - @Override - public ListenableFuture> invokeRpc(QName rpc, YangInstanceIdentifier identifier, CompositeNode input) { - InvokeRpc rpcMsg = new InvokeRpc(rpc, identifier, input); - - return executeMsg(rpcMsg); - } - - @Override - public Set getSupportedRpcs() { - // TODO : check if we need to get this from routing registry - return Collections.emptySet(); - } - - @Override - public ListenableFuture> invokeRpc(QName rpc, CompositeNode input) { - InvokeRpc rpcMsg = new InvokeRpc(rpc, null, input); - return executeMsg(rpcMsg); - } - - private ListenableFuture> executeMsg(Object rpcMsg) { - ListenableFuture> listenableFuture = null; - - try { - Object response = ActorUtil.executeOperation(rpcBroker, rpcMsg, ActorUtil.ASK_DURATION, ActorUtil.AWAIT_DURATION); - if(response instanceof RpcResponse) { - - RpcResponse rpcResponse = (RpcResponse) response; - CompositeNode result = XmlUtils.xmlToCompositeNode(rpcResponse.getResultCompositeNode()); - listenableFuture = Futures.immediateFuture(RpcResultBuilder.success(result).build()); - - } else if(response instanceof ErrorResponse) { - - ErrorResponse errorResponse = (ErrorResponse) response; - Exception e = errorResponse.getException(); - final RpcResultBuilder failed = RpcResultBuilder.failed(); - failed.withError(null, null, e.getMessage(), null, null, e.getCause()); - listenableFuture = Futures.immediateFuture(failed.build()); - - } - } catch (Exception e) { - LOG.error("Error occurred while invoking RPC actor {}", e); - - final RpcResultBuilder failed = RpcResultBuilder.failed(); - failed.withError(null, null, e.getMessage(), null, null, e.getCause()); - listenableFuture = Futures.immediateFuture(failed.build()); +public class RemoteRpcImplementation implements RpcImplementation, RoutedRpcDefaultImplementation { + private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcImplementation.class); + private final ActorRef rpcBroker; + private final SchemaContext schemaContext; + + public RemoteRpcImplementation(ActorRef rpcBroker, SchemaContext schemaContext) { + this.rpcBroker = rpcBroker; + this.schemaContext = schemaContext; + } + + @Override + public ListenableFuture> invokeRpc(QName rpc, + YangInstanceIdentifier identifier, CompositeNode input) { + InvokeRpc rpcMsg = new InvokeRpc(rpc, identifier, input); + + return executeMsg(rpcMsg); + } + + @Override + public Set getSupportedRpcs() { + // TODO : check if we need to get this from routing registry + return Collections.emptySet(); + } + + @Override + public ListenableFuture> invokeRpc(QName rpc, CompositeNode input) { + InvokeRpc rpcMsg = new InvokeRpc(rpc, null, input); + return executeMsg(rpcMsg); } - return listenableFuture; - } + private ListenableFuture> executeMsg(InvokeRpc rpcMsg) { + + final SettableFuture> listenableFuture = SettableFuture.create(); + + scala.concurrent.Future future = ask(rpcBroker, rpcMsg, + new Timeout(ActorUtil.ASK_DURATION)); + + OnComplete onComplete = new OnComplete() { + @Override + public void onComplete(Throwable failure, Object reply) throws Throwable { + if(failure != null) { + LOG.error("InvokeRpc failed", failure); + + RpcResult rpcResult; + if(failure instanceof RpcErrorsException) { + rpcResult = RpcResultBuilder.failed().withRpcErrors( + ((RpcErrorsException)failure).getRpcErrors()).build(); + } else { + rpcResult = RpcResultBuilder.failed().withError( + ErrorType.RPC, failure.getMessage(), failure).build(); + } + + listenableFuture.set(rpcResult); + return; + } + + RpcResponse rpcReply = (RpcResponse)reply; + CompositeNode result = XmlUtils.xmlToCompositeNode(rpcReply.getResultCompositeNode()); + listenableFuture.set(RpcResultBuilder.success(result).build()); + } + }; + + future.onComplete(onComplete, ExecutionContext.Implicits$.MODULE$.global()); + + return listenableFuture; + } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java index 4ec96c29cd..2aca655d26 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java @@ -8,11 +8,14 @@ package org.opendaylight.controller.remote.rpc; +import static akka.pattern.Patterns.ask; import akka.actor.ActorRef; import akka.actor.Props; +import akka.dispatch.OnComplete; import akka.japi.Creator; import akka.japi.Pair; -import org.opendaylight.controller.remote.rpc.messages.ErrorResponse; +import akka.util.Timeout; + import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc; import org.opendaylight.controller.remote.rpc.messages.InvokeRpc; import org.opendaylight.controller.remote.rpc.messages.RpcResponse; @@ -23,12 +26,23 @@ import org.opendaylight.controller.remote.rpc.utils.RoutingLogic; import org.opendaylight.controller.xml.codec.XmlUtils; import org.opendaylight.controller.sal.connector.api.RpcRouter; import org.opendaylight.controller.sal.core.api.Broker; +import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.opendaylight.yangtools.yang.data.api.CompositeNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.JdkFutureAdapters; +import com.google.common.util.concurrent.ListenableFuture; + +import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.concurrent.Future; @@ -38,81 +52,159 @@ import java.util.concurrent.Future; public class RpcBroker extends AbstractUntypedActor { - private static final Logger LOG = LoggerFactory.getLogger(RpcBroker.class); - private final Broker.ProviderSession brokerSession; - private final ActorRef rpcRegistry; - private SchemaContext schemaContext; - - private RpcBroker(Broker.ProviderSession brokerSession, ActorRef rpcRegistry, SchemaContext schemaContext){ - this.brokerSession = brokerSession; - this.rpcRegistry = rpcRegistry; - this.schemaContext = schemaContext; - } - - public static Props props(final Broker.ProviderSession brokerSession, final ActorRef rpcRegistry, final SchemaContext schemaContext){ - return Props.create(new Creator(){ - - @Override - public RpcBroker create() throws Exception { - return new RpcBroker(brokerSession, rpcRegistry, schemaContext); - } - }); - } - @Override - protected void handleReceive(Object message) throws Exception { - if(message instanceof InvokeRpc) { - invokeRemoteRpc((InvokeRpc) message); - } else if(message instanceof ExecuteRpc) { - executeRpc((ExecuteRpc) message); + private static final Logger LOG = LoggerFactory.getLogger(RpcBroker.class); + private final Broker.ProviderSession brokerSession; + private final ActorRef rpcRegistry; + private final SchemaContext schemaContext; + + private RpcBroker(Broker.ProviderSession brokerSession, ActorRef rpcRegistry, + SchemaContext schemaContext) { + this.brokerSession = brokerSession; + this.rpcRegistry = rpcRegistry; + this.schemaContext = schemaContext; } - } - - private void invokeRemoteRpc(InvokeRpc msg) { - // Look up the remote actor to execute rpc - LOG.debug("Looking up the remote actor for route {}", msg); - try { - // Find router - RpcRouter.RouteIdentifier routeId = new RouteIdentifierImpl(null, msg.getRpc(), msg.getIdentifier()); - RpcRegistry.Messages.FindRouters rpcMsg = new RpcRegistry.Messages.FindRouters(routeId); - RpcRegistry.Messages.FindRoutersReply rpcReply = - (RpcRegistry.Messages.FindRoutersReply) ActorUtil.executeOperation(rpcRegistry, rpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION); - - List> actorRefList = rpcReply.getRouterWithUpdateTime(); - - if(actorRefList == null || actorRefList.isEmpty()) { - LOG.debug("No remote actor found for rpc {{}}.", msg.getRpc()); - - getSender().tell(new ErrorResponse( - new IllegalStateException("No remote actor found for rpc execution of : " + msg.getRpc())), self()); - } else { - RoutingLogic logic = new LatestEntryRoutingLogic(actorRefList); - ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(), schemaContext), msg.getRpc()); - Object operationRes = ActorUtil.executeOperation(logic.select(), - executeMsg, ActorUtil.REMOTE_ASK_DURATION, ActorUtil.REMOTE_AWAIT_DURATION); + public static Props props(Broker.ProviderSession brokerSession, ActorRef rpcRegistry, + SchemaContext schemaContext) { + return Props.create(new RpcBrokerCreator(brokerSession, rpcRegistry, schemaContext)); + } - getSender().tell(operationRes, self()); - } - } catch (Exception e) { - LOG.error("invokeRemoteRpc: {}", e); - getSender().tell(new ErrorResponse(e), self()); + @Override + protected void handleReceive(Object message) throws Exception { + if(message instanceof InvokeRpc) { + invokeRemoteRpc((InvokeRpc) message); + } else if(message instanceof ExecuteRpc) { + executeRpc((ExecuteRpc) message); + } } - } + private void invokeRemoteRpc(final InvokeRpc msg) { + LOG.debug("Looking up the remote actor for rpc {}", msg.getRpc()); + + RpcRouter.RouteIdentifier routeId = new RouteIdentifierImpl( + null, msg.getRpc(), msg.getIdentifier()); + RpcRegistry.Messages.FindRouters findMsg = new RpcRegistry.Messages.FindRouters(routeId); + + scala.concurrent.Future future = ask(rpcRegistry, findMsg, + new Timeout(ActorUtil.LOCAL_ASK_DURATION)); + final ActorRef sender = getSender(); + final ActorRef self = self(); - private void executeRpc(ExecuteRpc msg) { - LOG.debug("Executing rpc for rpc {}", msg.getRpc()); - try { - Future> rpc = brokerSession.rpc(msg.getRpc(), - XmlUtils.inputXmlToCompositeNode(msg.getRpc(), msg.getInputCompositeNode(), schemaContext)); - RpcResult rpcResult = rpc != null ? rpc.get():null; - CompositeNode result = rpcResult != null ? rpcResult.getResult() : null; - getSender().tell(new RpcResponse(XmlUtils.outputCompositeNodeToXml(result, schemaContext)), self()); - } catch (Exception e) { - LOG.error("executeRpc: {}", e); - getSender().tell(new ErrorResponse(e), self()); + OnComplete onComplete = new OnComplete() { + @Override + public void onComplete(Throwable failure, Object reply) throws Throwable { + if(failure != null) { + LOG.error("FindRouters failed", failure); + sender.tell(new akka.actor.Status.Failure(failure), self); + return; + } + + RpcRegistry.Messages.FindRoutersReply findReply = + (RpcRegistry.Messages.FindRoutersReply)reply; + + List> actorRefList = findReply.getRouterWithUpdateTime(); + + if(actorRefList == null || actorRefList.isEmpty()) { + String message = String.format( + "No remote implementation found for rpc %s", msg.getRpc()); + sender.tell(new akka.actor.Status.Failure(new RpcErrorsException( + message, Arrays.asList(RpcResultBuilder.newError(ErrorType.RPC, + "operation-not-supported", message)))), self); + return; + } + + finishInvokeRpc(actorRefList, msg, sender, self); + } + }; + + future.onComplete(onComplete, getContext().dispatcher()); } - } + protected void finishInvokeRpc(final List> actorRefList, + final InvokeRpc msg, final ActorRef sender, final ActorRef self) { + + RoutingLogic logic = new LatestEntryRoutingLogic(actorRefList); + + ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(), + schemaContext), msg.getRpc()); + + scala.concurrent.Future future = ask(logic.select(), executeMsg, + new Timeout(ActorUtil.REMOTE_ASK_DURATION)); + + OnComplete onComplete = new OnComplete() { + @Override + public void onComplete(Throwable failure, Object reply) throws Throwable { + if(failure != null) { + LOG.error("ExecuteRpc failed", failure); + sender.tell(new akka.actor.Status.Failure(failure), self); + return; + } + + sender.tell(reply, self); + } + }; + + future.onComplete(onComplete, getContext().dispatcher()); + } + + private void executeRpc(final ExecuteRpc msg) { + LOG.debug("Executing rpc {}", msg.getRpc()); + + Future> future = brokerSession.rpc(msg.getRpc(), + XmlUtils.inputXmlToCompositeNode(msg.getRpc(), msg.getInputCompositeNode(), + schemaContext)); + + ListenableFuture> listenableFuture = + JdkFutureAdapters.listenInPoolThread(future); + + final ActorRef sender = getSender(); + final ActorRef self = self(); + + Futures.addCallback(listenableFuture, new FutureCallback>() { + @Override + public void onSuccess(RpcResult result) { + if(result.isSuccessful()) { + sender.tell(new RpcResponse(XmlUtils.outputCompositeNodeToXml(result.getResult(), + schemaContext)), self); + } else { + String message = String.format("Execution of RPC %s failed", msg.getRpc()); + Collection errors = result.getErrors(); + if(errors == null || errors.size() == 0) { + errors = Arrays.asList(RpcResultBuilder.newError(ErrorType.RPC, + null, message)); + } + + sender.tell(new akka.actor.Status.Failure(new RpcErrorsException( + message, errors)), self); + } + } + + @Override + public void onFailure(Throwable t) { + LOG.error("executeRpc for {} failed: {}", msg.getRpc(), t); + sender.tell(new akka.actor.Status.Failure(t), self); + } + }); + } + + private static class RpcBrokerCreator implements Creator { + private static final long serialVersionUID = 1L; + + final Broker.ProviderSession brokerSession; + final ActorRef rpcRegistry; + final SchemaContext schemaContext; + + RpcBrokerCreator(ProviderSession brokerSession, ActorRef rpcRegistry, + SchemaContext schemaContext) { + this.brokerSession = brokerSession; + this.rpcRegistry = rpcRegistry; + this.schemaContext = schemaContext; + } + + @Override + public RpcBroker create() throws Exception { + return new RpcBroker(brokerSession, rpcRegistry, schemaContext); + } + } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcErrorsException.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcErrorsException.java new file mode 100644 index 0000000000..7e4d8a034e --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcErrorsException.java @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity; +import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; +import org.opendaylight.yangtools.yang.common.RpcResultBuilder; + +/** + * An Exception for transferring RpcErrors. + * + * @author Thomas Pantelis + */ +public class RpcErrorsException extends Exception { + + private static final long serialVersionUID = 1L; + + private static class RpcErrorData implements Serializable { + private static final long serialVersionUID = 1L; + + final ErrorSeverity severity; + final ErrorType errorType; + final String tag; + final String applicationTag; + final String message; + final String info; + final Throwable cause; + + RpcErrorData(ErrorSeverity severity, ErrorType errorType, String tag, + String applicationTag, String message, String info, Throwable cause) { + this.severity = severity; + this.errorType = errorType; + this.tag = tag; + this.applicationTag = applicationTag; + this.message = message; + this.info = info; + this.cause = cause; + } + } + + private final List rpcErrorDataList = new ArrayList<>(); + + public RpcErrorsException(String message, Iterable rpcErrors) { + super(message); + + for(RpcError rpcError: rpcErrors) { + rpcErrorDataList.add(new RpcErrorData(rpcError.getSeverity(), rpcError.getErrorType(), + rpcError.getTag(), rpcError.getApplicationTag(), rpcError.getMessage(), + rpcError.getInfo(), rpcError.getCause())); + } + } + + public Collection getRpcErrors() { + Collection rpcErrors = new ArrayList<>(); + for(RpcErrorData ed: rpcErrorDataList) { + RpcError rpcError = ed.severity == ErrorSeverity.ERROR ? + RpcResultBuilder.newError(ed.errorType, ed.tag, ed.message, ed.applicationTag, + ed.info, ed.cause) : + RpcResultBuilder.newWarning(ed.errorType, ed.tag, ed.message, ed.applicationTag, + ed.info, ed.cause); + rpcErrors.add(rpcError); + } + + return rpcErrors; + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ErrorResponse.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ErrorResponse.java deleted file mode 100644 index 2c26243fe9..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ErrorResponse.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.remote.rpc.messages; - -import com.google.common.base.Preconditions; - -import java.io.Serializable; - -public class ErrorResponse implements Serializable { - - private final Exception exception; - - public ErrorResponse(final Exception e) { - Preconditions.checkNotNull(e, "Exception should be present for error message"); - this.exception = e; - } - - public Exception getException() { - return exception; - } -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/ActorUtil.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/ActorUtil.java index ca14fecb4c..e2baffa1b1 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/ActorUtil.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/ActorUtil.java @@ -8,44 +8,15 @@ */ package org.opendaylight.controller.remote.rpc.utils; -import akka.actor.ActorRef; -import akka.util.Timeout; -import scala.concurrent.Await; -import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; import java.util.concurrent.TimeUnit; -import static akka.pattern.Patterns.ask; - public class ActorUtil { public static final FiniteDuration LOCAL_ASK_DURATION = Duration.create(2, TimeUnit.SECONDS); public static final FiniteDuration REMOTE_ASK_DURATION = Duration.create(15, TimeUnit.SECONDS); public static final FiniteDuration ASK_DURATION = Duration.create(17, TimeUnit.SECONDS); - public static final FiniteDuration LOCAL_AWAIT_DURATION = Duration.create(2, TimeUnit.SECONDS); - public static final FiniteDuration REMOTE_AWAIT_DURATION = Duration.create(15, TimeUnit.SECONDS); - public static final FiniteDuration AWAIT_DURATION = Duration.create(17, TimeUnit.SECONDS); public static final FiniteDuration GOSSIP_TICK_INTERVAL = Duration.create(500, TimeUnit.MILLISECONDS); public static final String MAILBOX = "bounded-mailbox"; - - - /** - * Executes an operation on a local actor and wait for it's response - * - * @param actor - * @param message - * @param askDuration - * @param awaitDuration - * @return The response of the operation - */ - public static Object executeOperation(ActorRef actor, Object message, - FiniteDuration askDuration, FiniteDuration awaitDuration) throws Exception { - Future future = - ask(actor, message, new Timeout(askDuration)); - - return Await.result(future, awaitDuration); - } - - } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractRpcTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractRpcTest.java new file mode 100644 index 0000000000..8d886829aa --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractRpcTest.java @@ -0,0 +1,175 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.net.URI; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.mockito.Mockito; +import org.opendaylight.controller.sal.core.api.Broker; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity; +import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.opendaylight.yangtools.yang.data.api.Node; +import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode; +import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; + +import com.google.common.collect.ImmutableList; +import com.typesafe.config.ConfigFactory; + +/** + * Base class for RPC tests. + * + * @author Thomas Pantelis + */ +public class AbstractRpcTest { + static final String TEST_REV = "2014-08-28"; + static final String TEST_NS = "urn:test"; + static final URI TEST_URI = URI.create(TEST_NS); + static final QName TEST_RPC = QName.create(TEST_NS, TEST_REV, "test-rpc"); + static final QName TEST_RPC_INPUT = QName.create(TEST_NS, TEST_REV, "input"); + static final QName TEST_RPC_INPUT_DATA = QName.create(TEST_NS, TEST_REV, "input-data"); + static final QName TEST_RPC_OUTPUT = QName.create(TEST_NS, TEST_REV, "output"); + static final QName TEST_RPC_OUTPUT_DATA = new QName(TEST_URI, "output-data"); + + static ActorSystem node1; + static ActorSystem node2; + + protected ActorRef rpcBroker1; + protected JavaTestKit probeReg1; + protected ActorRef rpcBroker2; + protected JavaTestKit probeReg2; + protected Broker.ProviderSession brokerSession; + protected SchemaContext schemaContext; + + @BeforeClass + public static void setup() throws InterruptedException { + node1 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberA")); + node2 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberB")); + } + + @AfterClass + public static void teardown() { + JavaTestKit.shutdownActorSystem(node1); + JavaTestKit.shutdownActorSystem(node2); + node1 = null; + node2 = null; + } + + @Before + public void setUp() { + schemaContext = new YangParserImpl().parseFiles(Arrays.asList( + new File(RpcBrokerTest.class.getResource("/test-rpc.yang").getPath()))); + + brokerSession = Mockito.mock(Broker.ProviderSession.class); + probeReg1 = new JavaTestKit(node1); + rpcBroker1 = node1.actorOf(RpcBroker.props(brokerSession, probeReg1.getRef(), schemaContext)); + probeReg2 = new JavaTestKit(node2); + rpcBroker2 = node2.actorOf(RpcBroker.props(brokerSession, probeReg2.getRef(), schemaContext)); + + } + + static void assertRpcErrorEquals(RpcError rpcError, ErrorSeverity severity, + ErrorType errorType, String tag, String message, String applicationTag, String info, + String causeMsg) { + assertEquals("getSeverity", severity, rpcError.getSeverity()); + assertEquals("getErrorType", errorType, rpcError.getErrorType()); + assertEquals("getTag", tag, rpcError.getTag()); + assertTrue("getMessage contains " + message, rpcError.getMessage().contains(message)); + assertEquals("getApplicationTag", applicationTag, rpcError.getApplicationTag()); + assertEquals("getInfo", info, rpcError.getInfo()); + + if(causeMsg == null) { + assertNull("Unexpected cause " + rpcError.getCause(), rpcError.getCause()); + } else { + assertEquals("Cause message", causeMsg, rpcError.getCause().getMessage()); + } + } + + static void assertCompositeNodeEquals(CompositeNode exp, CompositeNode actual) { + assertEquals("NodeType getNamespace", exp.getNodeType().getNamespace(), + actual.getNodeType().getNamespace()); + assertEquals("NodeType getLocalName", exp.getNodeType().getLocalName(), + actual.getNodeType().getLocalName()); + for(Node child: exp.getValue()) { + List> c = actual.get(child.getNodeType()); + assertNotNull("Missing expected child " + child.getNodeType(), c); + if(child instanceof CompositeNode) { + assertCompositeNodeEquals((CompositeNode) child, (CompositeNode)c.get(0)); + } else { + assertEquals("Value for Node " + child.getNodeType(), child.getValue(), + c.get(0).getValue()); + } + } + } + + static CompositeNode makeRPCInput(String data) { + CompositeNodeBuilder builder = ImmutableCompositeNode.builder() + .setQName(TEST_RPC_INPUT).addLeaf(TEST_RPC_INPUT_DATA, data); + return ImmutableCompositeNode.create( + TEST_RPC, ImmutableList.>of(builder.toInstance())); + } + + static CompositeNode makeRPCOutput(String data) { + CompositeNodeBuilder builder = ImmutableCompositeNode.builder() + .setQName(TEST_RPC_OUTPUT).addLeaf(TEST_RPC_OUTPUT_DATA, data); + return ImmutableCompositeNode.create( + TEST_RPC, ImmutableList.>of(builder.toInstance())); + } + + static void assertFailedRpcResult(RpcResult rpcResult, ErrorSeverity severity, + ErrorType errorType, String tag, String message, String applicationTag, String info, + String causeMsg) { + + assertNotNull("RpcResult was null", rpcResult); + assertEquals("isSuccessful", false, rpcResult.isSuccessful()); + Collection rpcErrors = rpcResult.getErrors(); + assertEquals("RpcErrors count", 1, rpcErrors.size()); + assertRpcErrorEquals(rpcErrors.iterator().next(), severity, errorType, tag, message, + applicationTag, info, causeMsg); + } + + static void assertSuccessfulRpcResult(RpcResult rpcResult, + CompositeNode expOutput) { + + assertNotNull("RpcResult was null", rpcResult); + assertEquals("isSuccessful", true, rpcResult.isSuccessful()); + assertCompositeNodeEquals(expOutput, rpcResult.getResult()); + } + + static class TestException extends Exception { + private static final long serialVersionUID = 1L; + + static final String MESSAGE = "mock error"; + + TestException() { + super(MESSAGE); + } + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementationTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementationTest.java new file mode 100644 index 0000000000..6c3a57b344 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementationTest.java @@ -0,0 +1,185 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc; + +import static org.junit.Assert.assertEquals; +import java.net.URI; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Test; +import org.opendaylight.controller.remote.rpc.messages.InvokeRpc; +import org.opendaylight.controller.remote.rpc.messages.RpcResponse; +import org.opendaylight.controller.xml.codec.XmlUtils; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.common.RpcResultBuilder; +import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity; +import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; + +import akka.testkit.JavaTestKit; + +import com.google.common.util.concurrent.ListenableFuture; + +/*** + * Unit tests for RemoteRpcImplementation. + * + * @author Thomas Pantelis + */ +public class RemoteRpcImplementationTest extends AbstractRpcTest { + + @Test + public void testInvokeRpc() throws Exception { + final AtomicReference assertError = new AtomicReference<>(); + try { + RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation( + probeReg1.getRef(), schemaContext); + + final CompositeNode input = makeRPCInput("foo"); + final CompositeNode output = makeRPCOutput("bar"); + final AtomicReference invokeRpcMsg = setupInvokeRpcReply(assertError, output); + + ListenableFuture> future = rpcImpl.invokeRpc(TEST_RPC, input); + + RpcResult rpcResult = future.get(5, TimeUnit.SECONDS); + + assertSuccessfulRpcResult(rpcResult, (CompositeNode)output.getValue().get(0)); + + assertEquals("getRpc", TEST_RPC, invokeRpcMsg.get().getRpc()); + assertEquals("getInput", input, invokeRpcMsg.get().getInput()); + } finally { + if(assertError.get() != null) { + throw assertError.get(); + } + } + } + + @Test + public void testInvokeRpcWithIdentifier() throws Exception { + final AtomicReference assertError = new AtomicReference<>(); + try { + RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation( + probeReg1.getRef(), schemaContext); + + QName instanceQName = new QName(new URI("ns"), "instance"); + YangInstanceIdentifier identifier = YangInstanceIdentifier.of(instanceQName); + + CompositeNode input = makeRPCInput("foo"); + CompositeNode output = makeRPCOutput("bar"); + final AtomicReference invokeRpcMsg = setupInvokeRpcReply(assertError, output); + + ListenableFuture> future = rpcImpl.invokeRpc( + TEST_RPC, identifier, input); + + RpcResult rpcResult = future.get(5, TimeUnit.SECONDS); + + assertSuccessfulRpcResult(rpcResult, (CompositeNode)output.getValue().get(0)); + + assertEquals("getRpc", TEST_RPC, invokeRpcMsg.get().getRpc()); + assertEquals("getInput", input, invokeRpcMsg.get().getInput()); + assertEquals("getRoute", identifier, invokeRpcMsg.get().getIdentifier()); + } finally { + if(assertError.get() != null) { + throw assertError.get(); + } + } + } + + @Test + public void testInvokeRpcWithRpcErrorsException() throws Exception { + final AtomicReference assertError = new AtomicReference<>(); + try { + RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation( + probeReg1.getRef(), schemaContext); + + final CompositeNode input = makeRPCInput("foo"); + + setupInvokeRpcErrorReply(assertError, new RpcErrorsException( + "mock", Arrays.asList(RpcResultBuilder.newError(ErrorType.RPC, "tag", + "error", "appTag", "info", null)))); + + ListenableFuture> future = rpcImpl.invokeRpc(TEST_RPC, input); + + RpcResult rpcResult = future.get(5, TimeUnit.SECONDS); + + assertFailedRpcResult(rpcResult, ErrorSeverity.ERROR, ErrorType.RPC, "tag", + "error", "appTag", "info", null); + } finally { + if(assertError.get() != null) { + throw assertError.get(); + } + } + } + + @Test + public void testInvokeRpcWithOtherException() throws Exception { + final AtomicReference assertError = new AtomicReference<>(); + try { + RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation( + probeReg1.getRef(), schemaContext); + + final CompositeNode input = makeRPCInput("foo"); + + setupInvokeRpcErrorReply(assertError, new TestException()); + + ListenableFuture> future = rpcImpl.invokeRpc(TEST_RPC, input); + + RpcResult rpcResult = future.get(5, TimeUnit.SECONDS); + + assertFailedRpcResult(rpcResult, ErrorSeverity.ERROR, ErrorType.RPC, "operation-failed", + TestException.MESSAGE, null, null, TestException.MESSAGE); + } finally { + if(assertError.get() != null) { + throw assertError.get(); + } + } + } + + private AtomicReference setupInvokeRpcReply( + final AtomicReference assertError, final CompositeNode output) { + return setupInvokeRpcReply(assertError, output, null); + } + + private AtomicReference setupInvokeRpcErrorReply( + final AtomicReference assertError, final Exception error) { + return setupInvokeRpcReply(assertError, null, error); + } + + private AtomicReference setupInvokeRpcReply( + final AtomicReference assertError, final CompositeNode output, + final Exception error) { + final AtomicReference invokeRpcMsg = new AtomicReference<>(); + + new Thread() { + @Override + public void run() { + try { + invokeRpcMsg.set(probeReg1.expectMsgClass( + JavaTestKit.duration("5 seconds"), InvokeRpc.class)); + + if(output != null) { + probeReg1.reply(new RpcResponse(XmlUtils.outputCompositeNodeToXml( + output, schemaContext))); + } else { + probeReg1.reply(new akka.actor.Status.Failure(error)); + } + + } catch(AssertionError e) { + assertError.set(e); + } + } + + }.start(); + + return invokeRpcMsg; + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java index d9a3b6a414..28b1b476cd 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java @@ -10,144 +10,268 @@ package org.opendaylight.controller.remote.rpc; import akka.actor.ActorRef; -import akka.actor.ActorSystem; import akka.japi.Pair; import akka.testkit.JavaTestKit; + +import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; -import com.typesafe.config.ConfigFactory; -import junit.framework.Assert; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; +import static org.junit.Assert.assertEquals; import org.junit.Test; -import org.mockito.Mockito; -import org.opendaylight.controller.remote.rpc.messages.ErrorResponse; +import org.mockito.ArgumentCaptor; +import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc; import org.opendaylight.controller.remote.rpc.messages.InvokeRpc; import org.opendaylight.controller.remote.rpc.messages.RpcResponse; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; -import org.opendaylight.controller.sal.common.util.Rpcs; -import org.opendaylight.controller.sal.core.api.Broker; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters; +import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier; +import org.opendaylight.controller.xml.codec.XmlUtils; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity; +import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.opendaylight.yangtools.yang.data.api.CompositeNode; -import org.opendaylight.yangtools.yang.data.api.ModifyAction; -import org.opendaylight.yangtools.yang.data.api.Node; -import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; - +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import java.net.URI; import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Collection; +import java.util.Arrays; +import java.util.Collections; import java.util.List; -import java.util.concurrent.Future; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.any; + +public class RpcBrokerTest extends AbstractRpcTest { + + @Test + public void testInvokeRpcWithNoRemoteActor() throws Exception { + new JavaTestKit(node1) {{ + CompositeNode input = makeRPCInput("foo"); + + InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, input); + rpcBroker1.tell(invokeMsg, getRef()); + + probeReg1.expectMsgClass(duration("5 seconds"), RpcRegistry.Messages.FindRouters.class); + probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply( + Collections.>emptyList())); + + akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"), + akka.actor.Status.Failure.class); + + assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass()); + }}; + } + + + /** + * This test method invokes and executes the remote rpc + */ + //@Test + public void testInvokeRpc() throws URISyntaxException { + new JavaTestKit(node1) {{ + QName instanceQName = new QName(new URI("ns"), "instance"); + + CompositeNode invokeRpcResult = makeRPCOutput("bar"); + RpcResult rpcResult = + RpcResultBuilder.success(invokeRpcResult).build(); + ArgumentCaptor inputCaptor = new ArgumentCaptor<>(); + when(brokerSession.rpc(eq(TEST_RPC), inputCaptor.capture())) + .thenReturn(Futures.immediateFuture(rpcResult)); + + // invoke rpc + CompositeNode input = makeRPCInput("foo"); + YangInstanceIdentifier instanceID = YangInstanceIdentifier.of(instanceQName); + InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, instanceID, input); + rpcBroker1.tell(invokeMsg, getRef()); + + FindRouters findRouters = probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class); + RouteIdentifier routeIdentifier = findRouters.getRouteIdentifier(); + assertEquals("getType", TEST_RPC, routeIdentifier.getType()); + assertEquals("getRoute", instanceID, routeIdentifier.getRoute()); + + probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply( + Arrays.asList(new Pair(rpcBroker2, 200L)))); + + RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class); + assertCompositeNodeEquals((CompositeNode)invokeRpcResult.getValue().get(0), + XmlUtils.xmlToCompositeNode(rpcResponse.getResultCompositeNode())); + assertCompositeNodeEquals(input, inputCaptor.getValue()); + }}; + } + + @Test + public void testInvokeRpcWithNoOutput() { + new JavaTestKit(node1) {{ + + RpcResult rpcResult = RpcResultBuilder.success().build(); + when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class))) + .thenReturn(Futures.immediateFuture(rpcResult)); + + InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, makeRPCInput("foo")); + rpcBroker1.tell(invokeMsg, getRef()); + + probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class); + probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply( + Arrays.asList(new Pair(rpcBroker2, 200L)))); + + RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class); + + assertEquals("getResultCompositeNode", "", rpcResponse.getResultCompositeNode()); + }}; + } + + @Test + public void testInvokeRpcWithExecuteFailure() { + new JavaTestKit(node1) {{ + + RpcResult rpcResult = RpcResultBuilder.failed() + .withError(ErrorType.RPC, "tag", "error", "appTag", "info", + new Exception("mock")) + .build(); + when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class))) + .thenReturn(Futures.immediateFuture(rpcResult)); + + InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, makeRPCInput("foo")); + rpcBroker1.tell(invokeMsg, getRef()); + + probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class); + probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply( + Arrays.asList(new Pair(rpcBroker2, 200L)))); + + akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"), + akka.actor.Status.Failure.class); + + assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass()); + + RpcErrorsException errorsEx = (RpcErrorsException)failure.cause(); + List rpcErrors = Lists.newArrayList(errorsEx.getRpcErrors()); + assertEquals("RpcErrors count", 1, rpcErrors.size()); + assertRpcErrorEquals(rpcErrors.get(0), ErrorSeverity.ERROR, ErrorType.RPC, "tag", + "error", "appTag", "info", "mock"); + }}; + } + + @Test + public void testInvokeRpcWithFindRoutersFailure() { + new JavaTestKit(node1) {{ + + InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, makeRPCInput("foo")); + rpcBroker1.tell(invokeMsg, getRef()); + + probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class); + probeReg1.reply(new akka.actor.Status.Failure(new TestException())); + + akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"), + akka.actor.Status.Failure.class); + + assertEquals("failure.cause()", TestException.class, failure.cause().getClass()); + }}; + } + + @Test + public void testExecuteRpc() { + new JavaTestKit(node1) {{ + + String xml = "foo"; + + CompositeNode invokeRpcResult = makeRPCOutput("bar"); + RpcResult rpcResult = + RpcResultBuilder.success(invokeRpcResult).build(); + ArgumentCaptor inputCaptor = new ArgumentCaptor<>(); + when(brokerSession.rpc(eq(TEST_RPC), inputCaptor.capture())) + .thenReturn(Futures.immediateFuture(rpcResult)); + + ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC); + + rpcBroker1.tell(executeMsg, getRef()); + + RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class); + + assertCompositeNodeEquals((CompositeNode)invokeRpcResult.getValue().get(0), + XmlUtils.xmlToCompositeNode(rpcResponse.getResultCompositeNode())); + }}; + } + + @Test + public void testExecuteRpcFailureWithRpcErrors() { + new JavaTestKit(node1) {{ + + String xml = "foo"; + + RpcResult rpcResult = RpcResultBuilder.failed() + .withError(ErrorType.RPC, "tag1", "error", "appTag1", "info1", + new Exception("mock")) + .withWarning(ErrorType.PROTOCOL, "tag2", "warning", "appTag2", "info2", null) + .build(); + when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class))) + .thenReturn(Futures.immediateFuture(rpcResult)); + + ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC); + + rpcBroker1.tell(executeMsg, getRef()); + + akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"), + akka.actor.Status.Failure.class); + + assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass()); + + RpcErrorsException errorsEx = (RpcErrorsException)failure.cause(); + List rpcErrors = Lists.newArrayList(errorsEx.getRpcErrors()); + assertEquals("RpcErrors count", 2, rpcErrors.size()); + assertRpcErrorEquals(rpcErrors.get(0), ErrorSeverity.ERROR, ErrorType.RPC, "tag1", + "error", "appTag1", "info1", "mock"); + assertRpcErrorEquals(rpcErrors.get(1), ErrorSeverity.WARNING, ErrorType.PROTOCOL, "tag2", + "warning", "appTag2", "info2", null); + }}; + } + + @Test + public void testExecuteRpcFailureWithNoRpcErrors() { + new JavaTestKit(node1) {{ + + String xml = "foo"; + + RpcResult rpcResult = RpcResultBuilder.failed().build(); + when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class))) + .thenReturn(Futures.immediateFuture(rpcResult)); + + ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC); + + rpcBroker1.tell(executeMsg, getRef()); + + akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"), + akka.actor.Status.Failure.class); + + assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass()); + + RpcErrorsException errorsEx = (RpcErrorsException)failure.cause(); + List rpcErrors = Lists.newArrayList(errorsEx.getRpcErrors()); + assertEquals("RpcErrors count", 1, rpcErrors.size()); + assertRpcErrorEquals(rpcErrors.get(0), ErrorSeverity.ERROR, ErrorType.RPC, + "operation-failed", "failed", null, null, null); + }}; + } + + @Test + public void testExecuteRpcFailureWithException() { + new JavaTestKit(node1) {{ + + String xml = "foo"; + + when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class))) + .thenReturn(Futures.>immediateFailedFuture( + new TestException())); + + ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC); + + rpcBroker1.tell(executeMsg, getRef()); + + akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"), + akka.actor.Status.Failure.class); -public class RpcBrokerTest { - - static ActorSystem node1; - static ActorSystem node2; - private ActorRef rpcBroker1; - private JavaTestKit probeReg1; - private ActorRef rpcBroker2; - private JavaTestKit probeReg2; - private Broker.ProviderSession brokerSession; - - - @BeforeClass - public static void setup() throws InterruptedException { - node1 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberA")); - node2 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberB")); - } - - @AfterClass - public static void teardown() { - JavaTestKit.shutdownActorSystem(node1); - JavaTestKit.shutdownActorSystem(node2); - node1 = null; - node2 = null; - } - - @Before - public void createActor() { - brokerSession = Mockito.mock(Broker.ProviderSession.class); - SchemaContext schemaContext = mock(SchemaContext.class); - probeReg1 = new JavaTestKit(node1); - rpcBroker1 = node1.actorOf(RpcBroker.props(brokerSession, probeReg1.getRef(), schemaContext)); - probeReg2 = new JavaTestKit(node2); - rpcBroker2 = node2.actorOf(RpcBroker.props(brokerSession, probeReg2.getRef(), schemaContext)); - - } - @Test - public void testInvokeRpcError() throws Exception { - new JavaTestKit(node1) {{ - QName rpc = new QName(new URI("noactor1"), "noactor1"); - CompositeNode input = new ImmutableCompositeNode(QName.create("ns", "2013-12-09", "no child"), new ArrayList>(), ModifyAction.REPLACE); - - - InvokeRpc invokeMsg = new InvokeRpc(rpc, null, input); - rpcBroker1.tell(invokeMsg, getRef()); - probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class); - probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(new ArrayList>())); - - Boolean getMsg = new ExpectMsg("ErrorResponse") { - protected Boolean match(Object in) { - if (in instanceof ErrorResponse) { - ErrorResponse reply = (ErrorResponse)in; - return reply.getException().getMessage().contains("No remote actor found for rpc execution of :"); - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message - - Assert.assertTrue(getMsg); - - }}; - } - - - /** - * This test method invokes and executes the remote rpc - */ - - @Test - public void testInvokeRpc() throws URISyntaxException { - new JavaTestKit(node1) {{ - QName rpc = new QName(new URI("noactor1"), "noactor1"); - // invoke rpc - CompositeNode input = new ImmutableCompositeNode(QName.create("ns", "2013-12-09", "child1"), new ArrayList>(), ModifyAction.REPLACE); - InvokeRpc invokeMsg = new InvokeRpc(rpc, null, input); - rpcBroker1.tell(invokeMsg, getRef()); - - probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class); - List> routerList = new ArrayList>(); - - routerList.add(new Pair(rpcBroker2, 200L)); - - probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(routerList)); - - CompositeNode invokeRpcResult = mock(CompositeNode.class); - Collection errors = new ArrayList<>(); - RpcResult result = Rpcs.getRpcResult(true, invokeRpcResult, errors); - Future> rpcResult = Futures.immediateFuture(result); - when(brokerSession.rpc(rpc, input)).thenReturn(rpcResult); - - //verify response msg - Boolean getMsg = new ExpectMsg("RpcResponse") { - protected Boolean match(Object in) { - if (in instanceof RpcResponse) { - return true; - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message - - Assert.assertTrue(getMsg); - }}; - } + assertEquals("failure.cause()", TestException.class, failure.cause().getClass()); + }}; + } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/test-rpc.yang b/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/test-rpc.yang new file mode 100644 index 0000000000..3474e91834 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/test-rpc.yang @@ -0,0 +1,24 @@ +module test-rpc-service { + yang-version 1; + namespace "urn:test"; + prefix "rpc"; + + revision "2014-08-28" { + description + "Initial revision"; + } + + rpc test-rpc { + input { + leaf input-data { + type string; + } + } + + output { + leaf output-data { + type string; + } + } + } +} \ No newline at end of file -- 2.36.6