*/
public static String inputCompositeNodeToXml(CompositeNode cNode, SchemaContext schemaContext){
LOG.debug("Converting input composite node to xml {}", cNode);
- if (cNode == null) return BLANK;
+ if (cNode == null) {
+ return BLANK;
+ }
- if(schemaContext == null) return BLANK;
+ if(schemaContext == null) {
+ return BLANK;
+ }
Document domTree = null;
try {
*/
public static String outputCompositeNodeToXml(CompositeNode cNode, SchemaContext schemaContext){
LOG.debug("Converting output composite node to xml {}", cNode);
- if (cNode == null) return BLANK;
+ if (cNode == null) {
+ return BLANK;
+ }
- if(schemaContext == null) return BLANK;
+ if(schemaContext == null) {
+ return BLANK;
+ }
Document domTree = null;
try {
}
public static CompositeNode xmlToCompositeNode(String xml){
- if (xml==null || xml.length()==0) return null;
+ if (xml==null || xml.length()==0) {
+ return null;
+ }
Node<?> dataTree;
try {
*/
public static CompositeNode inputXmlToCompositeNode(QName rpc, String xml, SchemaContext schemaContext){
LOG.debug("Converting input xml to composite node {}", xml);
- if (xml==null || xml.length()==0) return null;
+ if (xml==null || xml.length()==0) {
+ return null;
+ }
- if(rpc == null) return null;
+ if(rpc == null) {
+ return null;
+ }
- if(schemaContext == null) return null;
+ if(schemaContext == null) {
+ return null;
+ }
CompositeNode compositeNode = null;
try {
LOG.debug("Converted xml input to list of nodes {}", dataNodes);
final CompositeNodeBuilder<ImmutableCompositeNode> it = ImmutableCompositeNode.builder();
- it.setQName(input);
+ it.setQName(rpc);
it.add(ImmutableCompositeNode.create(input, dataNodes));
compositeNode = it.toInstance();
break;
package org.opendaylight.controller.remote.rpc;
+import static akka.pattern.Patterns.ask;
import akka.actor.ActorRef;
-import com.google.common.util.concurrent.Futures;
+import akka.dispatch.OnComplete;
+import akka.util.Timeout;
+
import com.google.common.util.concurrent.ListenableFuture;
-import org.opendaylight.controller.remote.rpc.messages.ErrorResponse;
+import com.google.common.util.concurrent.SettableFuture;
+
import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.ExecutionContext;
+
import java.util.Collections;
import java.util.Set;
-public class RemoteRpcImplementation implements RpcImplementation,
- RoutedRpcDefaultImplementation {
- private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcImplementation.class);
- private ActorRef rpcBroker;
- private SchemaContext schemaContext;
-
- public RemoteRpcImplementation(ActorRef rpcBroker, SchemaContext schemaContext) {
- this.rpcBroker = rpcBroker;
- this.schemaContext = schemaContext;
- }
-
- @Override
- public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, YangInstanceIdentifier identifier, CompositeNode input) {
- InvokeRpc rpcMsg = new InvokeRpc(rpc, identifier, input);
-
- return executeMsg(rpcMsg);
- }
-
- @Override
- public Set<QName> getSupportedRpcs() {
- // TODO : check if we need to get this from routing registry
- return Collections.emptySet();
- }
-
- @Override
- public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
- InvokeRpc rpcMsg = new InvokeRpc(rpc, null, input);
- return executeMsg(rpcMsg);
- }
-
- private ListenableFuture<RpcResult<CompositeNode>> executeMsg(Object rpcMsg) {
- ListenableFuture<RpcResult<CompositeNode>> listenableFuture = null;
-
- try {
- Object response = ActorUtil.executeOperation(rpcBroker, rpcMsg, ActorUtil.ASK_DURATION, ActorUtil.AWAIT_DURATION);
- if(response instanceof RpcResponse) {
-
- RpcResponse rpcResponse = (RpcResponse) response;
- CompositeNode result = XmlUtils.xmlToCompositeNode(rpcResponse.getResultCompositeNode());
- listenableFuture = Futures.immediateFuture(RpcResultBuilder.success(result).build());
-
- } else if(response instanceof ErrorResponse) {
-
- ErrorResponse errorResponse = (ErrorResponse) response;
- Exception e = errorResponse.getException();
- final RpcResultBuilder<CompositeNode> failed = RpcResultBuilder.failed();
- failed.withError(null, null, e.getMessage(), null, null, e.getCause());
- listenableFuture = Futures.immediateFuture(failed.build());
-
- }
- } catch (Exception e) {
- LOG.error("Error occurred while invoking RPC actor {}", e);
-
- final RpcResultBuilder<CompositeNode> failed = RpcResultBuilder.failed();
- failed.withError(null, null, e.getMessage(), null, null, e.getCause());
- listenableFuture = Futures.immediateFuture(failed.build());
+public class RemoteRpcImplementation implements RpcImplementation, RoutedRpcDefaultImplementation {
+ private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcImplementation.class);
+ private final ActorRef rpcBroker;
+ private final SchemaContext schemaContext;
+
+ public RemoteRpcImplementation(ActorRef rpcBroker, SchemaContext schemaContext) {
+ this.rpcBroker = rpcBroker;
+ this.schemaContext = schemaContext;
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc,
+ YangInstanceIdentifier identifier, CompositeNode input) {
+ InvokeRpc rpcMsg = new InvokeRpc(rpc, identifier, input);
+
+ return executeMsg(rpcMsg);
+ }
+
+ @Override
+ public Set<QName> getSupportedRpcs() {
+ // TODO : check if we need to get this from routing registry
+ return Collections.emptySet();
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
+ InvokeRpc rpcMsg = new InvokeRpc(rpc, null, input);
+ return executeMsg(rpcMsg);
}
- return listenableFuture;
- }
+ private ListenableFuture<RpcResult<CompositeNode>> executeMsg(InvokeRpc rpcMsg) {
+
+ final SettableFuture<RpcResult<CompositeNode>> listenableFuture = SettableFuture.create();
+
+ scala.concurrent.Future<Object> future = ask(rpcBroker, rpcMsg,
+ new Timeout(ActorUtil.ASK_DURATION));
+
+ OnComplete<Object> onComplete = new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object reply) throws Throwable {
+ if(failure != null) {
+ LOG.error("InvokeRpc failed", failure);
+
+ RpcResult<CompositeNode> rpcResult;
+ if(failure instanceof RpcErrorsException) {
+ rpcResult = RpcResultBuilder.<CompositeNode>failed().withRpcErrors(
+ ((RpcErrorsException)failure).getRpcErrors()).build();
+ } else {
+ rpcResult = RpcResultBuilder.<CompositeNode>failed().withError(
+ ErrorType.RPC, failure.getMessage(), failure).build();
+ }
+
+ listenableFuture.set(rpcResult);
+ return;
+ }
+
+ RpcResponse rpcReply = (RpcResponse)reply;
+ CompositeNode result = XmlUtils.xmlToCompositeNode(rpcReply.getResultCompositeNode());
+ listenableFuture.set(RpcResultBuilder.success(result).build());
+ }
+ };
+
+ future.onComplete(onComplete, ExecutionContext.Implicits$.MODULE$.global());
+
+ return listenableFuture;
+ }
}
package org.opendaylight.controller.remote.rpc;
+import static akka.pattern.Patterns.ask;
import akka.actor.ActorRef;
import akka.actor.Props;
+import akka.dispatch.OnComplete;
import akka.japi.Creator;
import akka.japi.Pair;
-import org.opendaylight.controller.remote.rpc.messages.ErrorResponse;
+import akka.util.Timeout;
+
import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
import org.opendaylight.controller.xml.codec.XmlUtils;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.Future;
public class RpcBroker extends AbstractUntypedActor {
- private static final Logger LOG = LoggerFactory.getLogger(RpcBroker.class);
- private final Broker.ProviderSession brokerSession;
- private final ActorRef rpcRegistry;
- private SchemaContext schemaContext;
-
- private RpcBroker(Broker.ProviderSession brokerSession, ActorRef rpcRegistry, SchemaContext schemaContext){
- this.brokerSession = brokerSession;
- this.rpcRegistry = rpcRegistry;
- this.schemaContext = schemaContext;
- }
-
- public static Props props(final Broker.ProviderSession brokerSession, final ActorRef rpcRegistry, final SchemaContext schemaContext){
- return Props.create(new Creator<RpcBroker>(){
-
- @Override
- public RpcBroker create() throws Exception {
- return new RpcBroker(brokerSession, rpcRegistry, schemaContext);
- }
- });
- }
- @Override
- protected void handleReceive(Object message) throws Exception {
- if(message instanceof InvokeRpc) {
- invokeRemoteRpc((InvokeRpc) message);
- } else if(message instanceof ExecuteRpc) {
- executeRpc((ExecuteRpc) message);
+ private static final Logger LOG = LoggerFactory.getLogger(RpcBroker.class);
+ private final Broker.ProviderSession brokerSession;
+ private final ActorRef rpcRegistry;
+ private final SchemaContext schemaContext;
+
+ private RpcBroker(Broker.ProviderSession brokerSession, ActorRef rpcRegistry,
+ SchemaContext schemaContext) {
+ this.brokerSession = brokerSession;
+ this.rpcRegistry = rpcRegistry;
+ this.schemaContext = schemaContext;
}
- }
-
- private void invokeRemoteRpc(InvokeRpc msg) {
- // Look up the remote actor to execute rpc
- LOG.debug("Looking up the remote actor for route {}", msg);
- try {
- // Find router
- RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(null, msg.getRpc(), msg.getIdentifier());
- RpcRegistry.Messages.FindRouters rpcMsg = new RpcRegistry.Messages.FindRouters(routeId);
- RpcRegistry.Messages.FindRoutersReply rpcReply =
- (RpcRegistry.Messages.FindRoutersReply) ActorUtil.executeOperation(rpcRegistry, rpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION);
-
- List<Pair<ActorRef, Long>> actorRefList = rpcReply.getRouterWithUpdateTime();
-
- if(actorRefList == null || actorRefList.isEmpty()) {
- LOG.debug("No remote actor found for rpc {{}}.", msg.getRpc());
-
- getSender().tell(new ErrorResponse(
- new IllegalStateException("No remote actor found for rpc execution of : " + msg.getRpc())), self());
- } else {
- RoutingLogic logic = new LatestEntryRoutingLogic(actorRefList);
- ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(), schemaContext), msg.getRpc());
- Object operationRes = ActorUtil.executeOperation(logic.select(),
- executeMsg, ActorUtil.REMOTE_ASK_DURATION, ActorUtil.REMOTE_AWAIT_DURATION);
+ public static Props props(Broker.ProviderSession brokerSession, ActorRef rpcRegistry,
+ SchemaContext schemaContext) {
+ return Props.create(new RpcBrokerCreator(brokerSession, rpcRegistry, schemaContext));
+ }
- getSender().tell(operationRes, self());
- }
- } catch (Exception e) {
- LOG.error("invokeRemoteRpc: {}", e);
- getSender().tell(new ErrorResponse(e), self());
+ @Override
+ protected void handleReceive(Object message) throws Exception {
+ if(message instanceof InvokeRpc) {
+ invokeRemoteRpc((InvokeRpc) message);
+ } else if(message instanceof ExecuteRpc) {
+ executeRpc((ExecuteRpc) message);
+ }
}
- }
+ private void invokeRemoteRpc(final InvokeRpc msg) {
+ LOG.debug("Looking up the remote actor for rpc {}", msg.getRpc());
+
+ RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(
+ null, msg.getRpc(), msg.getIdentifier());
+ RpcRegistry.Messages.FindRouters findMsg = new RpcRegistry.Messages.FindRouters(routeId);
+
+ scala.concurrent.Future<Object> future = ask(rpcRegistry, findMsg,
+ new Timeout(ActorUtil.LOCAL_ASK_DURATION));
+ final ActorRef sender = getSender();
+ final ActorRef self = self();
- private void executeRpc(ExecuteRpc msg) {
- LOG.debug("Executing rpc for rpc {}", msg.getRpc());
- try {
- Future<RpcResult<CompositeNode>> rpc = brokerSession.rpc(msg.getRpc(),
- XmlUtils.inputXmlToCompositeNode(msg.getRpc(), msg.getInputCompositeNode(), schemaContext));
- RpcResult<CompositeNode> rpcResult = rpc != null ? rpc.get():null;
- CompositeNode result = rpcResult != null ? rpcResult.getResult() : null;
- getSender().tell(new RpcResponse(XmlUtils.outputCompositeNodeToXml(result, schemaContext)), self());
- } catch (Exception e) {
- LOG.error("executeRpc: {}", e);
- getSender().tell(new ErrorResponse(e), self());
+ OnComplete<Object> onComplete = new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object reply) throws Throwable {
+ if(failure != null) {
+ LOG.error("FindRouters failed", failure);
+ sender.tell(new akka.actor.Status.Failure(failure), self);
+ return;
+ }
+
+ RpcRegistry.Messages.FindRoutersReply findReply =
+ (RpcRegistry.Messages.FindRoutersReply)reply;
+
+ List<Pair<ActorRef, Long>> actorRefList = findReply.getRouterWithUpdateTime();
+
+ if(actorRefList == null || actorRefList.isEmpty()) {
+ String message = String.format(
+ "No remote implementation found for rpc %s", msg.getRpc());
+ sender.tell(new akka.actor.Status.Failure(new RpcErrorsException(
+ message, Arrays.asList(RpcResultBuilder.newError(ErrorType.RPC,
+ "operation-not-supported", message)))), self);
+ return;
+ }
+
+ finishInvokeRpc(actorRefList, msg, sender, self);
+ }
+ };
+
+ future.onComplete(onComplete, getContext().dispatcher());
}
- }
+ protected void finishInvokeRpc(final List<Pair<ActorRef, Long>> actorRefList,
+ final InvokeRpc msg, final ActorRef sender, final ActorRef self) {
+
+ RoutingLogic logic = new LatestEntryRoutingLogic(actorRefList);
+
+ ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(),
+ schemaContext), msg.getRpc());
+
+ scala.concurrent.Future<Object> future = ask(logic.select(), executeMsg,
+ new Timeout(ActorUtil.REMOTE_ASK_DURATION));
+
+ OnComplete<Object> onComplete = new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object reply) throws Throwable {
+ if(failure != null) {
+ LOG.error("ExecuteRpc failed", failure);
+ sender.tell(new akka.actor.Status.Failure(failure), self);
+ return;
+ }
+
+ sender.tell(reply, self);
+ }
+ };
+
+ future.onComplete(onComplete, getContext().dispatcher());
+ }
+
+ private void executeRpc(final ExecuteRpc msg) {
+ LOG.debug("Executing rpc {}", msg.getRpc());
+
+ Future<RpcResult<CompositeNode>> future = brokerSession.rpc(msg.getRpc(),
+ XmlUtils.inputXmlToCompositeNode(msg.getRpc(), msg.getInputCompositeNode(),
+ schemaContext));
+
+ ListenableFuture<RpcResult<CompositeNode>> listenableFuture =
+ JdkFutureAdapters.listenInPoolThread(future);
+
+ final ActorRef sender = getSender();
+ final ActorRef self = self();
+
+ Futures.addCallback(listenableFuture, new FutureCallback<RpcResult<CompositeNode>>() {
+ @Override
+ public void onSuccess(RpcResult<CompositeNode> result) {
+ if(result.isSuccessful()) {
+ sender.tell(new RpcResponse(XmlUtils.outputCompositeNodeToXml(result.getResult(),
+ schemaContext)), self);
+ } else {
+ String message = String.format("Execution of RPC %s failed", msg.getRpc());
+ Collection<RpcError> errors = result.getErrors();
+ if(errors == null || errors.size() == 0) {
+ errors = Arrays.asList(RpcResultBuilder.newError(ErrorType.RPC,
+ null, message));
+ }
+
+ sender.tell(new akka.actor.Status.Failure(new RpcErrorsException(
+ message, errors)), self);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("executeRpc for {} failed: {}", msg.getRpc(), t);
+ sender.tell(new akka.actor.Status.Failure(t), self);
+ }
+ });
+ }
+
+ private static class RpcBrokerCreator implements Creator<RpcBroker> {
+ private static final long serialVersionUID = 1L;
+
+ final Broker.ProviderSession brokerSession;
+ final ActorRef rpcRegistry;
+ final SchemaContext schemaContext;
+
+ RpcBrokerCreator(ProviderSession brokerSession, ActorRef rpcRegistry,
+ SchemaContext schemaContext) {
+ this.brokerSession = brokerSession;
+ this.rpcRegistry = rpcRegistry;
+ this.schemaContext = schemaContext;
+ }
+
+ @Override
+ public RpcBroker create() throws Exception {
+ return new RpcBroker(brokerSession, rpcRegistry, schemaContext);
+ }
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+
+/**
+ * An Exception for transferring RpcErrors.
+ *
+ * @author Thomas Pantelis
+ */
+public class RpcErrorsException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ private static class RpcErrorData implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ final ErrorSeverity severity;
+ final ErrorType errorType;
+ final String tag;
+ final String applicationTag;
+ final String message;
+ final String info;
+ final Throwable cause;
+
+ RpcErrorData(ErrorSeverity severity, ErrorType errorType, String tag,
+ String applicationTag, String message, String info, Throwable cause) {
+ this.severity = severity;
+ this.errorType = errorType;
+ this.tag = tag;
+ this.applicationTag = applicationTag;
+ this.message = message;
+ this.info = info;
+ this.cause = cause;
+ }
+ }
+
+ private final List<RpcErrorData> rpcErrorDataList = new ArrayList<>();
+
+ public RpcErrorsException(String message, Iterable<RpcError> rpcErrors) {
+ super(message);
+
+ for(RpcError rpcError: rpcErrors) {
+ rpcErrorDataList.add(new RpcErrorData(rpcError.getSeverity(), rpcError.getErrorType(),
+ rpcError.getTag(), rpcError.getApplicationTag(), rpcError.getMessage(),
+ rpcError.getInfo(), rpcError.getCause()));
+ }
+ }
+
+ public Collection<RpcError> getRpcErrors() {
+ Collection<RpcError> rpcErrors = new ArrayList<>();
+ for(RpcErrorData ed: rpcErrorDataList) {
+ RpcError rpcError = ed.severity == ErrorSeverity.ERROR ?
+ RpcResultBuilder.newError(ed.errorType, ed.tag, ed.message, ed.applicationTag,
+ ed.info, ed.cause) :
+ RpcResultBuilder.newWarning(ed.errorType, ed.tag, ed.message, ed.applicationTag,
+ ed.info, ed.cause);
+ rpcErrors.add(rpcError);
+ }
+
+ return rpcErrors;
+ }
+}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.remote.rpc.messages;
-
-import com.google.common.base.Preconditions;
-
-import java.io.Serializable;
-
-public class ErrorResponse implements Serializable {
-
- private final Exception exception;
-
- public ErrorResponse(final Exception e) {
- Preconditions.checkNotNull(e, "Exception should be present for error message");
- this.exception = e;
- }
-
- public Exception getException() {
- return exception;
- }
-}
*/
package org.opendaylight.controller.remote.rpc.utils;
-import akka.actor.ActorRef;
-import akka.util.Timeout;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import java.util.concurrent.TimeUnit;
-import static akka.pattern.Patterns.ask;
-
public class ActorUtil {
public static final FiniteDuration LOCAL_ASK_DURATION = Duration.create(2, TimeUnit.SECONDS);
public static final FiniteDuration REMOTE_ASK_DURATION = Duration.create(15, TimeUnit.SECONDS);
public static final FiniteDuration ASK_DURATION = Duration.create(17, TimeUnit.SECONDS);
- public static final FiniteDuration LOCAL_AWAIT_DURATION = Duration.create(2, TimeUnit.SECONDS);
- public static final FiniteDuration REMOTE_AWAIT_DURATION = Duration.create(15, TimeUnit.SECONDS);
- public static final FiniteDuration AWAIT_DURATION = Duration.create(17, TimeUnit.SECONDS);
public static final FiniteDuration GOSSIP_TICK_INTERVAL = Duration.create(500, TimeUnit.MILLISECONDS);
public static final String MAILBOX = "bounded-mailbox";
-
-
- /**
- * Executes an operation on a local actor and wait for it's response
- *
- * @param actor
- * @param message
- * @param askDuration
- * @param awaitDuration
- * @return The response of the operation
- */
- public static Object executeOperation(ActorRef actor, Object message,
- FiniteDuration askDuration, FiniteDuration awaitDuration) throws Exception {
- Future<Object> future =
- ask(actor, message, new Timeout(askDuration));
-
- return Await.result(future, awaitDuration);
- }
-
-
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.mockito.Mockito;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.Node;
+import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
+import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+
+import com.google.common.collect.ImmutableList;
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * Base class for RPC tests.
+ *
+ * @author Thomas Pantelis
+ */
+public class AbstractRpcTest {
+ static final String TEST_REV = "2014-08-28";
+ static final String TEST_NS = "urn:test";
+ static final URI TEST_URI = URI.create(TEST_NS);
+ static final QName TEST_RPC = QName.create(TEST_NS, TEST_REV, "test-rpc");
+ static final QName TEST_RPC_INPUT = QName.create(TEST_NS, TEST_REV, "input");
+ static final QName TEST_RPC_INPUT_DATA = QName.create(TEST_NS, TEST_REV, "input-data");
+ static final QName TEST_RPC_OUTPUT = QName.create(TEST_NS, TEST_REV, "output");
+ static final QName TEST_RPC_OUTPUT_DATA = new QName(TEST_URI, "output-data");
+
+ static ActorSystem node1;
+ static ActorSystem node2;
+
+ protected ActorRef rpcBroker1;
+ protected JavaTestKit probeReg1;
+ protected ActorRef rpcBroker2;
+ protected JavaTestKit probeReg2;
+ protected Broker.ProviderSession brokerSession;
+ protected SchemaContext schemaContext;
+
+ @BeforeClass
+ public static void setup() throws InterruptedException {
+ node1 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberA"));
+ node2 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberB"));
+ }
+
+ @AfterClass
+ public static void teardown() {
+ JavaTestKit.shutdownActorSystem(node1);
+ JavaTestKit.shutdownActorSystem(node2);
+ node1 = null;
+ node2 = null;
+ }
+
+ @Before
+ public void setUp() {
+ schemaContext = new YangParserImpl().parseFiles(Arrays.asList(
+ new File(RpcBrokerTest.class.getResource("/test-rpc.yang").getPath())));
+
+ brokerSession = Mockito.mock(Broker.ProviderSession.class);
+ probeReg1 = new JavaTestKit(node1);
+ rpcBroker1 = node1.actorOf(RpcBroker.props(brokerSession, probeReg1.getRef(), schemaContext));
+ probeReg2 = new JavaTestKit(node2);
+ rpcBroker2 = node2.actorOf(RpcBroker.props(brokerSession, probeReg2.getRef(), schemaContext));
+
+ }
+
+ static void assertRpcErrorEquals(RpcError rpcError, ErrorSeverity severity,
+ ErrorType errorType, String tag, String message, String applicationTag, String info,
+ String causeMsg) {
+ assertEquals("getSeverity", severity, rpcError.getSeverity());
+ assertEquals("getErrorType", errorType, rpcError.getErrorType());
+ assertEquals("getTag", tag, rpcError.getTag());
+ assertTrue("getMessage contains " + message, rpcError.getMessage().contains(message));
+ assertEquals("getApplicationTag", applicationTag, rpcError.getApplicationTag());
+ assertEquals("getInfo", info, rpcError.getInfo());
+
+ if(causeMsg == null) {
+ assertNull("Unexpected cause " + rpcError.getCause(), rpcError.getCause());
+ } else {
+ assertEquals("Cause message", causeMsg, rpcError.getCause().getMessage());
+ }
+ }
+
+ static void assertCompositeNodeEquals(CompositeNode exp, CompositeNode actual) {
+ assertEquals("NodeType getNamespace", exp.getNodeType().getNamespace(),
+ actual.getNodeType().getNamespace());
+ assertEquals("NodeType getLocalName", exp.getNodeType().getLocalName(),
+ actual.getNodeType().getLocalName());
+ for(Node<?> child: exp.getValue()) {
+ List<Node<?>> c = actual.get(child.getNodeType());
+ assertNotNull("Missing expected child " + child.getNodeType(), c);
+ if(child instanceof CompositeNode) {
+ assertCompositeNodeEquals((CompositeNode) child, (CompositeNode)c.get(0));
+ } else {
+ assertEquals("Value for Node " + child.getNodeType(), child.getValue(),
+ c.get(0).getValue());
+ }
+ }
+ }
+
+ static CompositeNode makeRPCInput(String data) {
+ CompositeNodeBuilder<ImmutableCompositeNode> builder = ImmutableCompositeNode.builder()
+ .setQName(TEST_RPC_INPUT).addLeaf(TEST_RPC_INPUT_DATA, data);
+ return ImmutableCompositeNode.create(
+ TEST_RPC, ImmutableList.<Node<?>>of(builder.toInstance()));
+ }
+
+ static CompositeNode makeRPCOutput(String data) {
+ CompositeNodeBuilder<ImmutableCompositeNode> builder = ImmutableCompositeNode.builder()
+ .setQName(TEST_RPC_OUTPUT).addLeaf(TEST_RPC_OUTPUT_DATA, data);
+ return ImmutableCompositeNode.create(
+ TEST_RPC, ImmutableList.<Node<?>>of(builder.toInstance()));
+ }
+
+ static void assertFailedRpcResult(RpcResult<CompositeNode> rpcResult, ErrorSeverity severity,
+ ErrorType errorType, String tag, String message, String applicationTag, String info,
+ String causeMsg) {
+
+ assertNotNull("RpcResult was null", rpcResult);
+ assertEquals("isSuccessful", false, rpcResult.isSuccessful());
+ Collection<RpcError> rpcErrors = rpcResult.getErrors();
+ assertEquals("RpcErrors count", 1, rpcErrors.size());
+ assertRpcErrorEquals(rpcErrors.iterator().next(), severity, errorType, tag, message,
+ applicationTag, info, causeMsg);
+ }
+
+ static void assertSuccessfulRpcResult(RpcResult<CompositeNode> rpcResult,
+ CompositeNode expOutput) {
+
+ assertNotNull("RpcResult was null", rpcResult);
+ assertEquals("isSuccessful", true, rpcResult.isSuccessful());
+ assertCompositeNodeEquals(expOutput, rpcResult.getResult());
+ }
+
+ static class TestException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ static final String MESSAGE = "mock error";
+
+ TestException() {
+ super(MESSAGE);
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc;
+
+import static org.junit.Assert.assertEquals;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.Test;
+import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
+import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
+import org.opendaylight.controller.xml.codec.XmlUtils;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+import akka.testkit.JavaTestKit;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+/***
+ * Unit tests for RemoteRpcImplementation.
+ *
+ * @author Thomas Pantelis
+ */
+public class RemoteRpcImplementationTest extends AbstractRpcTest {
+
+ @Test
+ public void testInvokeRpc() throws Exception {
+ final AtomicReference<AssertionError> assertError = new AtomicReference<>();
+ try {
+ RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation(
+ probeReg1.getRef(), schemaContext);
+
+ final CompositeNode input = makeRPCInput("foo");
+ final CompositeNode output = makeRPCOutput("bar");
+ final AtomicReference<InvokeRpc> invokeRpcMsg = setupInvokeRpcReply(assertError, output);
+
+ ListenableFuture<RpcResult<CompositeNode>> future = rpcImpl.invokeRpc(TEST_RPC, input);
+
+ RpcResult<CompositeNode> rpcResult = future.get(5, TimeUnit.SECONDS);
+
+ assertSuccessfulRpcResult(rpcResult, (CompositeNode)output.getValue().get(0));
+
+ assertEquals("getRpc", TEST_RPC, invokeRpcMsg.get().getRpc());
+ assertEquals("getInput", input, invokeRpcMsg.get().getInput());
+ } finally {
+ if(assertError.get() != null) {
+ throw assertError.get();
+ }
+ }
+ }
+
+ @Test
+ public void testInvokeRpcWithIdentifier() throws Exception {
+ final AtomicReference<AssertionError> assertError = new AtomicReference<>();
+ try {
+ RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation(
+ probeReg1.getRef(), schemaContext);
+
+ QName instanceQName = new QName(new URI("ns"), "instance");
+ YangInstanceIdentifier identifier = YangInstanceIdentifier.of(instanceQName);
+
+ CompositeNode input = makeRPCInput("foo");
+ CompositeNode output = makeRPCOutput("bar");
+ final AtomicReference<InvokeRpc> invokeRpcMsg = setupInvokeRpcReply(assertError, output);
+
+ ListenableFuture<RpcResult<CompositeNode>> future = rpcImpl.invokeRpc(
+ TEST_RPC, identifier, input);
+
+ RpcResult<CompositeNode> rpcResult = future.get(5, TimeUnit.SECONDS);
+
+ assertSuccessfulRpcResult(rpcResult, (CompositeNode)output.getValue().get(0));
+
+ assertEquals("getRpc", TEST_RPC, invokeRpcMsg.get().getRpc());
+ assertEquals("getInput", input, invokeRpcMsg.get().getInput());
+ assertEquals("getRoute", identifier, invokeRpcMsg.get().getIdentifier());
+ } finally {
+ if(assertError.get() != null) {
+ throw assertError.get();
+ }
+ }
+ }
+
+ @Test
+ public void testInvokeRpcWithRpcErrorsException() throws Exception {
+ final AtomicReference<AssertionError> assertError = new AtomicReference<>();
+ try {
+ RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation(
+ probeReg1.getRef(), schemaContext);
+
+ final CompositeNode input = makeRPCInput("foo");
+
+ setupInvokeRpcErrorReply(assertError, new RpcErrorsException(
+ "mock", Arrays.asList(RpcResultBuilder.newError(ErrorType.RPC, "tag",
+ "error", "appTag", "info", null))));
+
+ ListenableFuture<RpcResult<CompositeNode>> future = rpcImpl.invokeRpc(TEST_RPC, input);
+
+ RpcResult<CompositeNode> rpcResult = future.get(5, TimeUnit.SECONDS);
+
+ assertFailedRpcResult(rpcResult, ErrorSeverity.ERROR, ErrorType.RPC, "tag",
+ "error", "appTag", "info", null);
+ } finally {
+ if(assertError.get() != null) {
+ throw assertError.get();
+ }
+ }
+ }
+
+ @Test
+ public void testInvokeRpcWithOtherException() throws Exception {
+ final AtomicReference<AssertionError> assertError = new AtomicReference<>();
+ try {
+ RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation(
+ probeReg1.getRef(), schemaContext);
+
+ final CompositeNode input = makeRPCInput("foo");
+
+ setupInvokeRpcErrorReply(assertError, new TestException());
+
+ ListenableFuture<RpcResult<CompositeNode>> future = rpcImpl.invokeRpc(TEST_RPC, input);
+
+ RpcResult<CompositeNode> rpcResult = future.get(5, TimeUnit.SECONDS);
+
+ assertFailedRpcResult(rpcResult, ErrorSeverity.ERROR, ErrorType.RPC, "operation-failed",
+ TestException.MESSAGE, null, null, TestException.MESSAGE);
+ } finally {
+ if(assertError.get() != null) {
+ throw assertError.get();
+ }
+ }
+ }
+
+ private AtomicReference<InvokeRpc> setupInvokeRpcReply(
+ final AtomicReference<AssertionError> assertError, final CompositeNode output) {
+ return setupInvokeRpcReply(assertError, output, null);
+ }
+
+ private AtomicReference<InvokeRpc> setupInvokeRpcErrorReply(
+ final AtomicReference<AssertionError> assertError, final Exception error) {
+ return setupInvokeRpcReply(assertError, null, error);
+ }
+
+ private AtomicReference<InvokeRpc> setupInvokeRpcReply(
+ final AtomicReference<AssertionError> assertError, final CompositeNode output,
+ final Exception error) {
+ final AtomicReference<InvokeRpc> invokeRpcMsg = new AtomicReference<>();
+
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ invokeRpcMsg.set(probeReg1.expectMsgClass(
+ JavaTestKit.duration("5 seconds"), InvokeRpc.class));
+
+ if(output != null) {
+ probeReg1.reply(new RpcResponse(XmlUtils.outputCompositeNodeToXml(
+ output, schemaContext)));
+ } else {
+ probeReg1.reply(new akka.actor.Status.Failure(error));
+ }
+
+ } catch(AssertionError e) {
+ assertError.set(e);
+ }
+ }
+
+ }.start();
+
+ return invokeRpcMsg;
+ }
+}
import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.testkit.JavaTestKit;
+
+import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
-import com.typesafe.config.ConfigFactory;
-import junit.framework.Assert;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
+import static org.junit.Assert.assertEquals;
import org.junit.Test;
-import org.mockito.Mockito;
-import org.opendaylight.controller.remote.rpc.messages.ErrorResponse;
+import org.mockito.ArgumentCaptor;
+import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.controller.sal.common.util.Rpcs;
-import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
+import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
+import org.opendaylight.controller.xml.codec.XmlUtils;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.ModifyAction;
-import org.opendaylight.yangtools.yang.data.api.Node;
-import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
-import java.util.concurrent.Future;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.any;
+
+public class RpcBrokerTest extends AbstractRpcTest {
+
+ @Test
+ public void testInvokeRpcWithNoRemoteActor() throws Exception {
+ new JavaTestKit(node1) {{
+ CompositeNode input = makeRPCInput("foo");
+
+ InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, input);
+ rpcBroker1.tell(invokeMsg, getRef());
+
+ probeReg1.expectMsgClass(duration("5 seconds"), RpcRegistry.Messages.FindRouters.class);
+ probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(
+ Collections.<Pair<ActorRef, Long>>emptyList()));
+
+ akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
+ akka.actor.Status.Failure.class);
+
+ assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass());
+ }};
+ }
+
+
+ /**
+ * This test method invokes and executes the remote rpc
+ */
+ //@Test
+ public void testInvokeRpc() throws URISyntaxException {
+ new JavaTestKit(node1) {{
+ QName instanceQName = new QName(new URI("ns"), "instance");
+
+ CompositeNode invokeRpcResult = makeRPCOutput("bar");
+ RpcResult<CompositeNode> rpcResult =
+ RpcResultBuilder.<CompositeNode>success(invokeRpcResult).build();
+ ArgumentCaptor<CompositeNode> inputCaptor = new ArgumentCaptor<>();
+ when(brokerSession.rpc(eq(TEST_RPC), inputCaptor.capture()))
+ .thenReturn(Futures.immediateFuture(rpcResult));
+
+ // invoke rpc
+ CompositeNode input = makeRPCInput("foo");
+ YangInstanceIdentifier instanceID = YangInstanceIdentifier.of(instanceQName);
+ InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, instanceID, input);
+ rpcBroker1.tell(invokeMsg, getRef());
+
+ FindRouters findRouters = probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
+ RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
+ assertEquals("getType", TEST_RPC, routeIdentifier.getType());
+ assertEquals("getRoute", instanceID, routeIdentifier.getRoute());
+
+ probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(
+ Arrays.asList(new Pair<ActorRef, Long>(rpcBroker2, 200L))));
+
+ RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class);
+ assertCompositeNodeEquals((CompositeNode)invokeRpcResult.getValue().get(0),
+ XmlUtils.xmlToCompositeNode(rpcResponse.getResultCompositeNode()));
+ assertCompositeNodeEquals(input, inputCaptor.getValue());
+ }};
+ }
+
+ @Test
+ public void testInvokeRpcWithNoOutput() {
+ new JavaTestKit(node1) {{
+
+ RpcResult<CompositeNode> rpcResult = RpcResultBuilder.<CompositeNode>success().build();
+ when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class)))
+ .thenReturn(Futures.immediateFuture(rpcResult));
+
+ InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, makeRPCInput("foo"));
+ rpcBroker1.tell(invokeMsg, getRef());
+
+ probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
+ probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(
+ Arrays.asList(new Pair<ActorRef, Long>(rpcBroker2, 200L))));
+
+ RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class);
+
+ assertEquals("getResultCompositeNode", "", rpcResponse.getResultCompositeNode());
+ }};
+ }
+
+ @Test
+ public void testInvokeRpcWithExecuteFailure() {
+ new JavaTestKit(node1) {{
+
+ RpcResult<CompositeNode> rpcResult = RpcResultBuilder.<CompositeNode>failed()
+ .withError(ErrorType.RPC, "tag", "error", "appTag", "info",
+ new Exception("mock"))
+ .build();
+ when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class)))
+ .thenReturn(Futures.immediateFuture(rpcResult));
+
+ InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, makeRPCInput("foo"));
+ rpcBroker1.tell(invokeMsg, getRef());
+
+ probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
+ probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(
+ Arrays.asList(new Pair<ActorRef, Long>(rpcBroker2, 200L))));
+
+ akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
+ akka.actor.Status.Failure.class);
+
+ assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass());
+
+ RpcErrorsException errorsEx = (RpcErrorsException)failure.cause();
+ List<RpcError> rpcErrors = Lists.newArrayList(errorsEx.getRpcErrors());
+ assertEquals("RpcErrors count", 1, rpcErrors.size());
+ assertRpcErrorEquals(rpcErrors.get(0), ErrorSeverity.ERROR, ErrorType.RPC, "tag",
+ "error", "appTag", "info", "mock");
+ }};
+ }
+
+ @Test
+ public void testInvokeRpcWithFindRoutersFailure() {
+ new JavaTestKit(node1) {{
+
+ InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, makeRPCInput("foo"));
+ rpcBroker1.tell(invokeMsg, getRef());
+
+ probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
+ probeReg1.reply(new akka.actor.Status.Failure(new TestException()));
+
+ akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
+ akka.actor.Status.Failure.class);
+
+ assertEquals("failure.cause()", TestException.class, failure.cause().getClass());
+ }};
+ }
+
+ @Test
+ public void testExecuteRpc() {
+ new JavaTestKit(node1) {{
+
+ String xml = "<input xmlns=\"urn:test\"><input-data>foo</input-data></input>";
+
+ CompositeNode invokeRpcResult = makeRPCOutput("bar");
+ RpcResult<CompositeNode> rpcResult =
+ RpcResultBuilder.<CompositeNode>success(invokeRpcResult).build();
+ ArgumentCaptor<CompositeNode> inputCaptor = new ArgumentCaptor<>();
+ when(brokerSession.rpc(eq(TEST_RPC), inputCaptor.capture()))
+ .thenReturn(Futures.immediateFuture(rpcResult));
+
+ ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC);
+
+ rpcBroker1.tell(executeMsg, getRef());
+
+ RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class);
+
+ assertCompositeNodeEquals((CompositeNode)invokeRpcResult.getValue().get(0),
+ XmlUtils.xmlToCompositeNode(rpcResponse.getResultCompositeNode()));
+ }};
+ }
+
+ @Test
+ public void testExecuteRpcFailureWithRpcErrors() {
+ new JavaTestKit(node1) {{
+
+ String xml = "<input xmlns=\"urn:test\"><input-data>foo</input-data></input>";
+
+ RpcResult<CompositeNode> rpcResult = RpcResultBuilder.<CompositeNode>failed()
+ .withError(ErrorType.RPC, "tag1", "error", "appTag1", "info1",
+ new Exception("mock"))
+ .withWarning(ErrorType.PROTOCOL, "tag2", "warning", "appTag2", "info2", null)
+ .build();
+ when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class)))
+ .thenReturn(Futures.immediateFuture(rpcResult));
+
+ ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC);
+
+ rpcBroker1.tell(executeMsg, getRef());
+
+ akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
+ akka.actor.Status.Failure.class);
+
+ assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass());
+
+ RpcErrorsException errorsEx = (RpcErrorsException)failure.cause();
+ List<RpcError> rpcErrors = Lists.newArrayList(errorsEx.getRpcErrors());
+ assertEquals("RpcErrors count", 2, rpcErrors.size());
+ assertRpcErrorEquals(rpcErrors.get(0), ErrorSeverity.ERROR, ErrorType.RPC, "tag1",
+ "error", "appTag1", "info1", "mock");
+ assertRpcErrorEquals(rpcErrors.get(1), ErrorSeverity.WARNING, ErrorType.PROTOCOL, "tag2",
+ "warning", "appTag2", "info2", null);
+ }};
+ }
+
+ @Test
+ public void testExecuteRpcFailureWithNoRpcErrors() {
+ new JavaTestKit(node1) {{
+
+ String xml = "<input xmlns=\"urn:test\"><input-data>foo</input-data></input>";
+
+ RpcResult<CompositeNode> rpcResult = RpcResultBuilder.<CompositeNode>failed().build();
+ when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class)))
+ .thenReturn(Futures.immediateFuture(rpcResult));
+
+ ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC);
+
+ rpcBroker1.tell(executeMsg, getRef());
+
+ akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
+ akka.actor.Status.Failure.class);
+
+ assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass());
+
+ RpcErrorsException errorsEx = (RpcErrorsException)failure.cause();
+ List<RpcError> rpcErrors = Lists.newArrayList(errorsEx.getRpcErrors());
+ assertEquals("RpcErrors count", 1, rpcErrors.size());
+ assertRpcErrorEquals(rpcErrors.get(0), ErrorSeverity.ERROR, ErrorType.RPC,
+ "operation-failed", "failed", null, null, null);
+ }};
+ }
+
+ @Test
+ public void testExecuteRpcFailureWithException() {
+ new JavaTestKit(node1) {{
+
+ String xml = "<input xmlns=\"urn:test\"><input-data>foo</input-data></input>";
+
+ when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class)))
+ .thenReturn(Futures.<RpcResult<CompositeNode>>immediateFailedFuture(
+ new TestException()));
+
+ ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC);
+
+ rpcBroker1.tell(executeMsg, getRef());
+
+ akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
+ akka.actor.Status.Failure.class);
-public class RpcBrokerTest {
-
- static ActorSystem node1;
- static ActorSystem node2;
- private ActorRef rpcBroker1;
- private JavaTestKit probeReg1;
- private ActorRef rpcBroker2;
- private JavaTestKit probeReg2;
- private Broker.ProviderSession brokerSession;
-
-
- @BeforeClass
- public static void setup() throws InterruptedException {
- node1 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberA"));
- node2 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberB"));
- }
-
- @AfterClass
- public static void teardown() {
- JavaTestKit.shutdownActorSystem(node1);
- JavaTestKit.shutdownActorSystem(node2);
- node1 = null;
- node2 = null;
- }
-
- @Before
- public void createActor() {
- brokerSession = Mockito.mock(Broker.ProviderSession.class);
- SchemaContext schemaContext = mock(SchemaContext.class);
- probeReg1 = new JavaTestKit(node1);
- rpcBroker1 = node1.actorOf(RpcBroker.props(brokerSession, probeReg1.getRef(), schemaContext));
- probeReg2 = new JavaTestKit(node2);
- rpcBroker2 = node2.actorOf(RpcBroker.props(brokerSession, probeReg2.getRef(), schemaContext));
-
- }
- @Test
- public void testInvokeRpcError() throws Exception {
- new JavaTestKit(node1) {{
- QName rpc = new QName(new URI("noactor1"), "noactor1");
- CompositeNode input = new ImmutableCompositeNode(QName.create("ns", "2013-12-09", "no child"), new ArrayList<Node<?>>(), ModifyAction.REPLACE);
-
-
- InvokeRpc invokeMsg = new InvokeRpc(rpc, null, input);
- rpcBroker1.tell(invokeMsg, getRef());
- probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
- probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(new ArrayList<Pair<ActorRef, Long>>()));
-
- Boolean getMsg = new ExpectMsg<Boolean>("ErrorResponse") {
- protected Boolean match(Object in) {
- if (in instanceof ErrorResponse) {
- ErrorResponse reply = (ErrorResponse)in;
- return reply.getException().getMessage().contains("No remote actor found for rpc execution of :");
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- Assert.assertTrue(getMsg);
-
- }};
- }
-
-
- /**
- * This test method invokes and executes the remote rpc
- */
-
- @Test
- public void testInvokeRpc() throws URISyntaxException {
- new JavaTestKit(node1) {{
- QName rpc = new QName(new URI("noactor1"), "noactor1");
- // invoke rpc
- CompositeNode input = new ImmutableCompositeNode(QName.create("ns", "2013-12-09", "child1"), new ArrayList<Node<?>>(), ModifyAction.REPLACE);
- InvokeRpc invokeMsg = new InvokeRpc(rpc, null, input);
- rpcBroker1.tell(invokeMsg, getRef());
-
- probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
- List<Pair<ActorRef, Long>> routerList = new ArrayList<Pair<ActorRef, Long>>();
-
- routerList.add(new Pair<ActorRef, Long>(rpcBroker2, 200L));
-
- probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(routerList));
-
- CompositeNode invokeRpcResult = mock(CompositeNode.class);
- Collection<RpcError> errors = new ArrayList<>();
- RpcResult<CompositeNode> result = Rpcs.getRpcResult(true, invokeRpcResult, errors);
- Future<RpcResult<CompositeNode>> rpcResult = Futures.immediateFuture(result);
- when(brokerSession.rpc(rpc, input)).thenReturn(rpcResult);
-
- //verify response msg
- Boolean getMsg = new ExpectMsg<Boolean>("RpcResponse") {
- protected Boolean match(Object in) {
- if (in instanceof RpcResponse) {
- return true;
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- Assert.assertTrue(getMsg);
- }};
- }
+ assertEquals("failure.cause()", TestException.class, failure.cause().getClass());
+ }};
+ }
}
--- /dev/null
+module test-rpc-service {
+ yang-version 1;
+ namespace "urn:test";
+ prefix "rpc";
+
+ revision "2014-08-28" {
+ description
+ "Initial revision";
+ }
+
+ rpc test-rpc {
+ input {
+ leaf input-data {
+ type string;
+ }
+ }
+
+ output {
+ leaf output-data {
+ type string;
+ }
+ }
+ }
+}
\ No newline at end of file