Bug 1637: Change Rpc actor calls to async 43/10443/6
authortpantelis <tpanteli@brocade.com>
Mon, 25 Aug 2014 00:29:58 +0000 (20:29 -0400)
committerMoiz Raja <moraja@cisco.com>
Thu, 4 Sep 2014 15:22:58 +0000 (15:22 +0000)
Changed RemoteRpcImplementation and RpcBroker to use async calls and
modified/created unit tests.

Added an RpcErrorsException to transfer RpcError info on failure. This
is now used in lieu of ErrorResponse to communicate failures.

Change-Id: Ib0bbda1867caff9b6c584b9b8a9b336f8acf53d7
Signed-off-by: tpantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/xml/codec/XmlUtils.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcErrorsException.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ErrorResponse.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/ActorUtil.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractRpcTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementationTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/test-rpc.yang [new file with mode: 0644]

index 5848561676b34ff68b744afa4f5f76b2acd680ca..ea8f4a3ef19810a6d95ebc4211b0f4569b6e2716 100644 (file)
@@ -75,9 +75,13 @@ public class XmlUtils {
    */
   public static String inputCompositeNodeToXml(CompositeNode cNode, SchemaContext schemaContext){
     LOG.debug("Converting input composite node to xml {}", cNode);
-    if (cNode == null) return BLANK;
+    if (cNode == null) {
+        return BLANK;
+    }
 
-    if(schemaContext == null) return BLANK;
+    if(schemaContext == null) {
+        return BLANK;
+    }
 
     Document domTree = null;
     try {
@@ -108,9 +112,13 @@ public class XmlUtils {
    */
   public static String outputCompositeNodeToXml(CompositeNode cNode, SchemaContext schemaContext){
     LOG.debug("Converting output composite node to xml {}", cNode);
-    if (cNode == null) return BLANK;
+    if (cNode == null) {
+        return BLANK;
+    }
 
-    if(schemaContext == null) return BLANK;
+    if(schemaContext == null) {
+        return BLANK;
+    }
 
     Document domTree = null;
     try {
@@ -150,7 +158,9 @@ public class XmlUtils {
   }
 
   public static CompositeNode xmlToCompositeNode(String xml){
-    if (xml==null || xml.length()==0) return null;
+    if (xml==null || xml.length()==0) {
+        return null;
+    }
 
     Node<?> dataTree;
     try {
@@ -179,11 +189,17 @@ public class XmlUtils {
    */
   public static CompositeNode inputXmlToCompositeNode(QName rpc, String xml,  SchemaContext schemaContext){
     LOG.debug("Converting input xml to composite node {}", xml);
-    if (xml==null || xml.length()==0) return null;
+    if (xml==null || xml.length()==0) {
+        return null;
+    }
 
-    if(rpc == null) return null;
+    if(rpc == null) {
+        return null;
+    }
 
-    if(schemaContext == null) return null;
+    if(schemaContext == null) {
+        return null;
+    }
 
     CompositeNode compositeNode = null;
     try {
@@ -213,7 +229,7 @@ public class XmlUtils {
           LOG.debug("Converted xml input to list of nodes  {}", dataNodes);
 
           final CompositeNodeBuilder<ImmutableCompositeNode> it = ImmutableCompositeNode.builder();
-          it.setQName(input);
+          it.setQName(rpc);
           it.add(ImmutableCompositeNode.create(input, dataNodes));
           compositeNode = it.toInstance();
           break;
index 4496bd3263f9f80c6d1594e2e51de474ecb22a46..7d7dbf0f3a58bc404882ad78186340d8eef2aba9 100644 (file)
@@ -1,9 +1,13 @@
 package org.opendaylight.controller.remote.rpc;
 
+import static akka.pattern.Patterns.ask;
 import akka.actor.ActorRef;
-import com.google.common.util.concurrent.Futures;
+import akka.dispatch.OnComplete;
+import akka.util.Timeout;
+
 import com.google.common.util.concurrent.ListenableFuture;
-import org.opendaylight.controller.remote.rpc.messages.ErrorResponse;
+import com.google.common.util.concurrent.SettableFuture;
+
 import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
 import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
 import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
@@ -13,73 +17,82 @@ import org.opendaylight.controller.sal.core.api.RpcImplementation;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import scala.concurrent.ExecutionContext;
+
 import java.util.Collections;
 import java.util.Set;
 
-public class RemoteRpcImplementation implements RpcImplementation,
-    RoutedRpcDefaultImplementation {
-  private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcImplementation.class);
-  private ActorRef rpcBroker;
-  private SchemaContext schemaContext;
-
-  public RemoteRpcImplementation(ActorRef rpcBroker, SchemaContext schemaContext) {
-    this.rpcBroker = rpcBroker;
-    this.schemaContext = schemaContext;
-  }
-
-  @Override
-  public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, YangInstanceIdentifier identifier, CompositeNode input) {
-    InvokeRpc rpcMsg = new InvokeRpc(rpc, identifier, input);
-
-    return executeMsg(rpcMsg);
-  }
-
-  @Override
-  public Set<QName> getSupportedRpcs() {
-    // TODO : check if we need to get this from routing registry
-    return Collections.emptySet();
-  }
-
-  @Override
-  public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
-    InvokeRpc rpcMsg = new InvokeRpc(rpc, null, input);
-    return executeMsg(rpcMsg);
-  }
-
-  private ListenableFuture<RpcResult<CompositeNode>> executeMsg(Object rpcMsg) {
-    ListenableFuture<RpcResult<CompositeNode>> listenableFuture = null;
-
-    try {
-      Object response = ActorUtil.executeOperation(rpcBroker, rpcMsg, ActorUtil.ASK_DURATION, ActorUtil.AWAIT_DURATION);
-      if(response instanceof RpcResponse) {
-
-        RpcResponse rpcResponse = (RpcResponse) response;
-        CompositeNode result = XmlUtils.xmlToCompositeNode(rpcResponse.getResultCompositeNode());
-        listenableFuture = Futures.immediateFuture(RpcResultBuilder.success(result).build());
-
-      } else if(response instanceof ErrorResponse) {
-
-        ErrorResponse errorResponse = (ErrorResponse) response;
-        Exception e = errorResponse.getException();
-        final RpcResultBuilder<CompositeNode> failed = RpcResultBuilder.failed();
-        failed.withError(null, null, e.getMessage(), null, null, e.getCause());
-        listenableFuture = Futures.immediateFuture(failed.build());
-
-      }
-    } catch (Exception e) {
-      LOG.error("Error occurred while invoking RPC actor {}", e);
-
-      final RpcResultBuilder<CompositeNode> failed = RpcResultBuilder.failed();
-      failed.withError(null, null, e.getMessage(), null, null, e.getCause());
-      listenableFuture = Futures.immediateFuture(failed.build());
+public class RemoteRpcImplementation implements RpcImplementation, RoutedRpcDefaultImplementation {
+    private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcImplementation.class);
+    private final ActorRef rpcBroker;
+    private final SchemaContext schemaContext;
+
+    public RemoteRpcImplementation(ActorRef rpcBroker, SchemaContext schemaContext) {
+        this.rpcBroker = rpcBroker;
+        this.schemaContext = schemaContext;
+    }
+
+    @Override
+    public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc,
+            YangInstanceIdentifier identifier, CompositeNode input) {
+        InvokeRpc rpcMsg = new InvokeRpc(rpc, identifier, input);
+
+        return executeMsg(rpcMsg);
+    }
+
+    @Override
+    public Set<QName> getSupportedRpcs() {
+        // TODO : check if we need to get this from routing registry
+        return Collections.emptySet();
+    }
+
+    @Override
+    public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
+        InvokeRpc rpcMsg = new InvokeRpc(rpc, null, input);
+        return executeMsg(rpcMsg);
     }
 
-    return listenableFuture;
-  }
+    private ListenableFuture<RpcResult<CompositeNode>> executeMsg(InvokeRpc rpcMsg) {
+
+        final SettableFuture<RpcResult<CompositeNode>> listenableFuture = SettableFuture.create();
+
+        scala.concurrent.Future<Object> future = ask(rpcBroker, rpcMsg,
+                new Timeout(ActorUtil.ASK_DURATION));
+
+        OnComplete<Object> onComplete = new OnComplete<Object>() {
+            @Override
+            public void onComplete(Throwable failure, Object reply) throws Throwable {
+                if(failure != null) {
+                    LOG.error("InvokeRpc failed", failure);
+
+                    RpcResult<CompositeNode> rpcResult;
+                    if(failure instanceof RpcErrorsException) {
+                        rpcResult = RpcResultBuilder.<CompositeNode>failed().withRpcErrors(
+                                ((RpcErrorsException)failure).getRpcErrors()).build();
+                    } else {
+                        rpcResult = RpcResultBuilder.<CompositeNode>failed().withError(
+                                ErrorType.RPC, failure.getMessage(), failure).build();
+                    }
+
+                    listenableFuture.set(rpcResult);
+                    return;
+                }
+
+                RpcResponse rpcReply = (RpcResponse)reply;
+                CompositeNode result = XmlUtils.xmlToCompositeNode(rpcReply.getResultCompositeNode());
+                listenableFuture.set(RpcResultBuilder.success(result).build());
+            }
+        };
+
+        future.onComplete(onComplete, ExecutionContext.Implicits$.MODULE$.global());
+
+        return listenableFuture;
+    }
 }
index 4ec96c29cd2510175defa6327a235ffecb519b28..2aca655d2628eb9d89295d09419d0cd44f7491d7 100644 (file)
@@ -8,11 +8,14 @@
 
 package org.opendaylight.controller.remote.rpc;
 
+import static akka.pattern.Patterns.ask;
 import akka.actor.ActorRef;
 import akka.actor.Props;
+import akka.dispatch.OnComplete;
 import akka.japi.Creator;
 import akka.japi.Pair;
-import org.opendaylight.controller.remote.rpc.messages.ErrorResponse;
+import akka.util.Timeout;
+
 import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
 import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
 import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
@@ -23,12 +26,23 @@ import org.opendaylight.controller.remote.rpc.utils.RoutingLogic;
 import org.opendaylight.controller.xml.codec.XmlUtils;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.Future;
 
@@ -38,81 +52,159 @@ import java.util.concurrent.Future;
 
 public class RpcBroker extends AbstractUntypedActor {
 
-  private static final Logger LOG = LoggerFactory.getLogger(RpcBroker.class);
-  private final Broker.ProviderSession brokerSession;
-  private final ActorRef rpcRegistry;
-  private SchemaContext schemaContext;
-
-  private RpcBroker(Broker.ProviderSession brokerSession, ActorRef rpcRegistry, SchemaContext schemaContext){
-    this.brokerSession = brokerSession;
-    this.rpcRegistry = rpcRegistry;
-    this.schemaContext = schemaContext;
-  }
-
-  public static Props props(final Broker.ProviderSession brokerSession, final ActorRef rpcRegistry, final SchemaContext schemaContext){
-    return Props.create(new Creator<RpcBroker>(){
-
-      @Override
-      public RpcBroker create() throws Exception {
-        return new RpcBroker(brokerSession, rpcRegistry, schemaContext);
-      }
-    });
-  }
-  @Override
-  protected void handleReceive(Object message) throws Exception {
-   if(message instanceof InvokeRpc) {
-      invokeRemoteRpc((InvokeRpc) message);
-    } else if(message instanceof ExecuteRpc) {
-      executeRpc((ExecuteRpc) message);
+    private static final Logger LOG = LoggerFactory.getLogger(RpcBroker.class);
+    private final Broker.ProviderSession brokerSession;
+    private final ActorRef rpcRegistry;
+    private final SchemaContext schemaContext;
+
+    private RpcBroker(Broker.ProviderSession brokerSession, ActorRef rpcRegistry,
+            SchemaContext schemaContext) {
+        this.brokerSession = brokerSession;
+        this.rpcRegistry = rpcRegistry;
+        this.schemaContext = schemaContext;
     }
-  }
-
-  private void invokeRemoteRpc(InvokeRpc msg) {
-    // Look up the remote actor to execute rpc
-    LOG.debug("Looking up the remote actor for route {}", msg);
-    try {
-      // Find router
-      RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(null, msg.getRpc(), msg.getIdentifier());
-      RpcRegistry.Messages.FindRouters rpcMsg = new RpcRegistry.Messages.FindRouters(routeId);
-      RpcRegistry.Messages.FindRoutersReply rpcReply =
-          (RpcRegistry.Messages.FindRoutersReply) ActorUtil.executeOperation(rpcRegistry, rpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION);
-
-      List<Pair<ActorRef, Long>> actorRefList = rpcReply.getRouterWithUpdateTime();
-
-      if(actorRefList == null || actorRefList.isEmpty()) {
-        LOG.debug("No remote actor found for rpc {{}}.", msg.getRpc());
-
-        getSender().tell(new ErrorResponse(
-            new IllegalStateException("No remote actor found for rpc execution of : " + msg.getRpc())), self());
-      } else {
-        RoutingLogic logic = new LatestEntryRoutingLogic(actorRefList);
 
-        ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(), schemaContext), msg.getRpc());
-        Object operationRes = ActorUtil.executeOperation(logic.select(),
-            executeMsg, ActorUtil.REMOTE_ASK_DURATION, ActorUtil.REMOTE_AWAIT_DURATION);
+    public static Props props(Broker.ProviderSession brokerSession, ActorRef rpcRegistry,
+            SchemaContext schemaContext) {
+        return Props.create(new RpcBrokerCreator(brokerSession, rpcRegistry, schemaContext));
+    }
 
-        getSender().tell(operationRes, self());
-      }
-    } catch (Exception e) {
-        LOG.error("invokeRemoteRpc: {}", e);
-        getSender().tell(new ErrorResponse(e), self());
+    @Override
+    protected void handleReceive(Object message) throws Exception {
+        if(message instanceof InvokeRpc) {
+            invokeRemoteRpc((InvokeRpc) message);
+        } else if(message instanceof ExecuteRpc) {
+            executeRpc((ExecuteRpc) message);
+        }
     }
-  }
 
+    private void invokeRemoteRpc(final InvokeRpc msg) {
+        LOG.debug("Looking up the remote actor for rpc {}", msg.getRpc());
+
+        RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(
+                null, msg.getRpc(), msg.getIdentifier());
+        RpcRegistry.Messages.FindRouters findMsg = new RpcRegistry.Messages.FindRouters(routeId);
+
+        scala.concurrent.Future<Object> future = ask(rpcRegistry, findMsg,
+                new Timeout(ActorUtil.LOCAL_ASK_DURATION));
 
+        final ActorRef sender = getSender();
+        final ActorRef self = self();
 
-  private void executeRpc(ExecuteRpc msg) {
-    LOG.debug("Executing rpc for rpc {}", msg.getRpc());
-    try {
-      Future<RpcResult<CompositeNode>> rpc = brokerSession.rpc(msg.getRpc(),
-          XmlUtils.inputXmlToCompositeNode(msg.getRpc(), msg.getInputCompositeNode(), schemaContext));
-      RpcResult<CompositeNode> rpcResult = rpc != null ? rpc.get():null;
-      CompositeNode result = rpcResult != null ? rpcResult.getResult() : null;
-      getSender().tell(new RpcResponse(XmlUtils.outputCompositeNodeToXml(result, schemaContext)), self());
-    } catch (Exception e) {
-      LOG.error("executeRpc: {}", e);
-      getSender().tell(new ErrorResponse(e), self());
+        OnComplete<Object> onComplete = new OnComplete<Object>() {
+            @Override
+            public void onComplete(Throwable failure, Object reply) throws Throwable {
+                if(failure != null) {
+                    LOG.error("FindRouters failed", failure);
+                    sender.tell(new akka.actor.Status.Failure(failure), self);
+                    return;
+                }
+
+                RpcRegistry.Messages.FindRoutersReply findReply =
+                                                (RpcRegistry.Messages.FindRoutersReply)reply;
+
+                List<Pair<ActorRef, Long>> actorRefList = findReply.getRouterWithUpdateTime();
+
+                if(actorRefList == null || actorRefList.isEmpty()) {
+                    String message = String.format(
+                            "No remote implementation found for rpc %s",  msg.getRpc());
+                    sender.tell(new akka.actor.Status.Failure(new RpcErrorsException(
+                            message, Arrays.asList(RpcResultBuilder.newError(ErrorType.RPC,
+                                    "operation-not-supported", message)))), self);
+                    return;
+                }
+
+                finishInvokeRpc(actorRefList, msg, sender, self);
+            }
+        };
+
+        future.onComplete(onComplete, getContext().dispatcher());
     }
-  }
 
+    protected void finishInvokeRpc(final List<Pair<ActorRef, Long>> actorRefList,
+            final InvokeRpc msg, final ActorRef sender, final ActorRef self) {
+
+        RoutingLogic logic = new LatestEntryRoutingLogic(actorRefList);
+
+        ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(),
+                schemaContext), msg.getRpc());
+
+        scala.concurrent.Future<Object> future = ask(logic.select(), executeMsg,
+                new Timeout(ActorUtil.REMOTE_ASK_DURATION));
+
+        OnComplete<Object> onComplete = new OnComplete<Object>() {
+            @Override
+            public void onComplete(Throwable failure, Object reply) throws Throwable {
+                if(failure != null) {
+                    LOG.error("ExecuteRpc failed", failure);
+                    sender.tell(new akka.actor.Status.Failure(failure), self);
+                    return;
+                }
+
+                sender.tell(reply, self);
+            }
+        };
+
+        future.onComplete(onComplete, getContext().dispatcher());
+    }
+
+    private void executeRpc(final ExecuteRpc msg) {
+        LOG.debug("Executing rpc {}", msg.getRpc());
+
+        Future<RpcResult<CompositeNode>> future = brokerSession.rpc(msg.getRpc(),
+                XmlUtils.inputXmlToCompositeNode(msg.getRpc(), msg.getInputCompositeNode(),
+                        schemaContext));
+
+        ListenableFuture<RpcResult<CompositeNode>> listenableFuture =
+                JdkFutureAdapters.listenInPoolThread(future);
+
+        final ActorRef sender = getSender();
+        final ActorRef self = self();
+
+        Futures.addCallback(listenableFuture, new FutureCallback<RpcResult<CompositeNode>>() {
+            @Override
+            public void onSuccess(RpcResult<CompositeNode> result) {
+                if(result.isSuccessful()) {
+                    sender.tell(new RpcResponse(XmlUtils.outputCompositeNodeToXml(result.getResult(),
+                            schemaContext)), self);
+                } else {
+                    String message = String.format("Execution of RPC %s failed",  msg.getRpc());
+                    Collection<RpcError> errors = result.getErrors();
+                    if(errors == null || errors.size() == 0) {
+                        errors = Arrays.asList(RpcResultBuilder.newError(ErrorType.RPC,
+                                null, message));
+                    }
+
+                    sender.tell(new akka.actor.Status.Failure(new RpcErrorsException(
+                            message, errors)), self);
+                }
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                LOG.error("executeRpc for {} failed: {}", msg.getRpc(), t);
+                sender.tell(new akka.actor.Status.Failure(t), self);
+            }
+        });
+    }
+
+    private static class RpcBrokerCreator implements Creator<RpcBroker> {
+        private static final long serialVersionUID = 1L;
+
+        final Broker.ProviderSession brokerSession;
+        final ActorRef rpcRegistry;
+        final SchemaContext schemaContext;
+
+        RpcBrokerCreator(ProviderSession brokerSession, ActorRef rpcRegistry,
+                SchemaContext schemaContext) {
+            this.brokerSession = brokerSession;
+            this.rpcRegistry = rpcRegistry;
+            this.schemaContext = schemaContext;
+        }
+
+        @Override
+        public RpcBroker create() throws Exception {
+            return new RpcBroker(brokerSession, rpcRegistry, schemaContext);
+        }
+    }
 }
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcErrorsException.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcErrorsException.java
new file mode 100644 (file)
index 0000000..7e4d8a0
--- /dev/null
@@ -0,0 +1,78 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+
+/**
+ * An Exception for transferring RpcErrors.
+ *
+ * @author Thomas Pantelis
+ */
+public class RpcErrorsException extends Exception {
+
+    private static final long serialVersionUID = 1L;
+
+    private static class RpcErrorData implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        final ErrorSeverity severity;
+        final ErrorType errorType;
+        final String tag;
+        final String applicationTag;
+        final String message;
+        final String info;
+        final Throwable cause;
+
+        RpcErrorData(ErrorSeverity severity, ErrorType errorType, String tag,
+                String applicationTag, String message, String info, Throwable cause) {
+            this.severity = severity;
+            this.errorType = errorType;
+            this.tag = tag;
+            this.applicationTag = applicationTag;
+            this.message = message;
+            this.info = info;
+            this.cause = cause;
+        }
+    }
+
+    private final List<RpcErrorData> rpcErrorDataList = new ArrayList<>();
+
+    public RpcErrorsException(String message, Iterable<RpcError> rpcErrors) {
+        super(message);
+
+        for(RpcError rpcError: rpcErrors) {
+            rpcErrorDataList.add(new RpcErrorData(rpcError.getSeverity(), rpcError.getErrorType(),
+                    rpcError.getTag(), rpcError.getApplicationTag(), rpcError.getMessage(),
+                    rpcError.getInfo(), rpcError.getCause()));
+        }
+    }
+
+    public Collection<RpcError> getRpcErrors() {
+        Collection<RpcError> rpcErrors = new ArrayList<>();
+        for(RpcErrorData ed: rpcErrorDataList) {
+            RpcError rpcError = ed.severity == ErrorSeverity.ERROR ?
+                    RpcResultBuilder.newError(ed.errorType, ed.tag, ed.message, ed.applicationTag,
+                            ed.info, ed.cause) :
+                    RpcResultBuilder.newWarning(ed.errorType, ed.tag, ed.message, ed.applicationTag,
+                            ed.info, ed.cause);
+            rpcErrors.add(rpcError);
+        }
+
+        return rpcErrors;
+    }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ErrorResponse.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ErrorResponse.java
deleted file mode 100644 (file)
index 2c26243..0000000
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.remote.rpc.messages;
-
-import com.google.common.base.Preconditions;
-
-import java.io.Serializable;
-
-public class ErrorResponse implements Serializable {
-
-  private final Exception exception;
-
-  public ErrorResponse(final Exception e) {
-    Preconditions.checkNotNull(e, "Exception should be present for error message");
-    this.exception = e;
-  }
-
-  public Exception getException() {
-    return exception;
-  }
-}
index ca14fecb4c4bc65d4d56f66b67b2533fd802ce2a..e2baffa1b13a02e294a8ff2f83361f41b0dd83e9 100644 (file)
@@ -8,44 +8,15 @@
  */
 package org.opendaylight.controller.remote.rpc.utils;
 
-import akka.actor.ActorRef;
-import akka.util.Timeout;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.concurrent.TimeUnit;
 
-import static akka.pattern.Patterns.ask;
-
 public class ActorUtil {
     public static final FiniteDuration LOCAL_ASK_DURATION = Duration.create(2, TimeUnit.SECONDS);
     public static final FiniteDuration REMOTE_ASK_DURATION = Duration.create(15, TimeUnit.SECONDS);
     public static final FiniteDuration ASK_DURATION = Duration.create(17, TimeUnit.SECONDS);
-    public static final FiniteDuration LOCAL_AWAIT_DURATION = Duration.create(2, TimeUnit.SECONDS);
-    public static final FiniteDuration REMOTE_AWAIT_DURATION = Duration.create(15, TimeUnit.SECONDS);
-    public static final FiniteDuration AWAIT_DURATION = Duration.create(17, TimeUnit.SECONDS);
     public static final FiniteDuration GOSSIP_TICK_INTERVAL = Duration.create(500, TimeUnit.MILLISECONDS);
     public static final String MAILBOX = "bounded-mailbox";
-
-
-    /**
-     * Executes an operation on a local actor and wait for it's response
-     *
-     * @param actor
-     * @param message
-     * @param askDuration
-     * @param awaitDuration
-     * @return The response of the operation
-     */
-    public static Object executeOperation(ActorRef actor, Object message,
-                                          FiniteDuration askDuration, FiniteDuration awaitDuration) throws Exception {
-        Future<Object> future =
-                ask(actor, message, new Timeout(askDuration));
-
-        return Await.result(future, awaitDuration);
-    }
-
-
 }
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractRpcTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractRpcTest.java
new file mode 100644 (file)
index 0000000..8d88682
--- /dev/null
@@ -0,0 +1,175 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.mockito.Mockito;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.Node;
+import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
+import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+
+import com.google.common.collect.ImmutableList;
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * Base class for RPC tests.
+ *
+ * @author Thomas Pantelis
+ */
+public class AbstractRpcTest {
+    static final String TEST_REV = "2014-08-28";
+    static final String TEST_NS = "urn:test";
+    static final URI TEST_URI = URI.create(TEST_NS);
+    static final QName TEST_RPC = QName.create(TEST_NS, TEST_REV, "test-rpc");
+    static final QName TEST_RPC_INPUT = QName.create(TEST_NS, TEST_REV, "input");
+    static final QName TEST_RPC_INPUT_DATA = QName.create(TEST_NS, TEST_REV, "input-data");
+    static final QName TEST_RPC_OUTPUT = QName.create(TEST_NS, TEST_REV, "output");
+    static final QName TEST_RPC_OUTPUT_DATA = new QName(TEST_URI, "output-data");
+
+    static ActorSystem node1;
+    static ActorSystem node2;
+
+    protected ActorRef rpcBroker1;
+    protected JavaTestKit probeReg1;
+    protected ActorRef rpcBroker2;
+    protected JavaTestKit probeReg2;
+    protected Broker.ProviderSession brokerSession;
+    protected SchemaContext schemaContext;
+
+    @BeforeClass
+    public static void setup() throws InterruptedException {
+        node1 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberA"));
+        node2 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberB"));
+    }
+
+    @AfterClass
+    public static void teardown() {
+        JavaTestKit.shutdownActorSystem(node1);
+        JavaTestKit.shutdownActorSystem(node2);
+        node1 = null;
+        node2 = null;
+    }
+
+    @Before
+    public void setUp() {
+        schemaContext = new YangParserImpl().parseFiles(Arrays.asList(
+                new File(RpcBrokerTest.class.getResource("/test-rpc.yang").getPath())));
+
+        brokerSession = Mockito.mock(Broker.ProviderSession.class);
+        probeReg1 = new JavaTestKit(node1);
+        rpcBroker1 = node1.actorOf(RpcBroker.props(brokerSession, probeReg1.getRef(), schemaContext));
+        probeReg2 = new JavaTestKit(node2);
+        rpcBroker2 = node2.actorOf(RpcBroker.props(brokerSession, probeReg2.getRef(), schemaContext));
+
+    }
+
+    static void assertRpcErrorEquals(RpcError rpcError, ErrorSeverity severity,
+            ErrorType errorType, String tag, String message, String applicationTag, String info,
+            String causeMsg) {
+        assertEquals("getSeverity", severity, rpcError.getSeverity());
+        assertEquals("getErrorType", errorType, rpcError.getErrorType());
+        assertEquals("getTag", tag, rpcError.getTag());
+        assertTrue("getMessage contains " + message, rpcError.getMessage().contains(message));
+        assertEquals("getApplicationTag", applicationTag, rpcError.getApplicationTag());
+        assertEquals("getInfo", info, rpcError.getInfo());
+
+        if(causeMsg == null) {
+            assertNull("Unexpected cause " + rpcError.getCause(), rpcError.getCause());
+        } else {
+            assertEquals("Cause message", causeMsg, rpcError.getCause().getMessage());
+        }
+    }
+
+    static void assertCompositeNodeEquals(CompositeNode exp, CompositeNode actual) {
+        assertEquals("NodeType getNamespace", exp.getNodeType().getNamespace(),
+                actual.getNodeType().getNamespace());
+        assertEquals("NodeType getLocalName", exp.getNodeType().getLocalName(),
+                actual.getNodeType().getLocalName());
+        for(Node<?> child: exp.getValue()) {
+            List<Node<?>> c = actual.get(child.getNodeType());
+            assertNotNull("Missing expected child " + child.getNodeType(), c);
+            if(child instanceof CompositeNode) {
+                assertCompositeNodeEquals((CompositeNode) child, (CompositeNode)c.get(0));
+            } else {
+                assertEquals("Value for Node " + child.getNodeType(), child.getValue(),
+                        c.get(0).getValue());
+            }
+        }
+    }
+
+    static CompositeNode makeRPCInput(String data) {
+        CompositeNodeBuilder<ImmutableCompositeNode> builder = ImmutableCompositeNode.builder()
+                .setQName(TEST_RPC_INPUT).addLeaf(TEST_RPC_INPUT_DATA, data);
+        return ImmutableCompositeNode.create(
+                TEST_RPC, ImmutableList.<Node<?>>of(builder.toInstance()));
+    }
+
+    static CompositeNode makeRPCOutput(String data) {
+        CompositeNodeBuilder<ImmutableCompositeNode> builder = ImmutableCompositeNode.builder()
+                .setQName(TEST_RPC_OUTPUT).addLeaf(TEST_RPC_OUTPUT_DATA, data);
+        return ImmutableCompositeNode.create(
+                TEST_RPC, ImmutableList.<Node<?>>of(builder.toInstance()));
+    }
+
+    static void assertFailedRpcResult(RpcResult<CompositeNode> rpcResult, ErrorSeverity severity,
+            ErrorType errorType, String tag, String message, String applicationTag, String info,
+            String causeMsg) {
+
+        assertNotNull("RpcResult was null", rpcResult);
+        assertEquals("isSuccessful", false, rpcResult.isSuccessful());
+        Collection<RpcError> rpcErrors = rpcResult.getErrors();
+        assertEquals("RpcErrors count", 1, rpcErrors.size());
+        assertRpcErrorEquals(rpcErrors.iterator().next(), severity, errorType, tag, message,
+                applicationTag, info, causeMsg);
+    }
+
+    static void assertSuccessfulRpcResult(RpcResult<CompositeNode> rpcResult,
+            CompositeNode expOutput) {
+
+        assertNotNull("RpcResult was null", rpcResult);
+        assertEquals("isSuccessful", true, rpcResult.isSuccessful());
+        assertCompositeNodeEquals(expOutput, rpcResult.getResult());
+    }
+
+    static class TestException extends Exception {
+        private static final long serialVersionUID = 1L;
+
+        static final String MESSAGE = "mock error";
+
+        TestException() {
+            super(MESSAGE);
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementationTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementationTest.java
new file mode 100644 (file)
index 0000000..6c3a57b
--- /dev/null
@@ -0,0 +1,185 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc;
+
+import static org.junit.Assert.assertEquals;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.Test;
+import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
+import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
+import org.opendaylight.controller.xml.codec.XmlUtils;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+import akka.testkit.JavaTestKit;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+/***
+ * Unit tests for RemoteRpcImplementation.
+ *
+ * @author Thomas Pantelis
+ */
+public class RemoteRpcImplementationTest extends AbstractRpcTest {
+
+    @Test
+    public void testInvokeRpc() throws Exception {
+        final AtomicReference<AssertionError> assertError = new AtomicReference<>();
+        try {
+            RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation(
+                    probeReg1.getRef(), schemaContext);
+
+            final CompositeNode input = makeRPCInput("foo");
+            final CompositeNode output = makeRPCOutput("bar");
+            final AtomicReference<InvokeRpc> invokeRpcMsg = setupInvokeRpcReply(assertError, output);
+
+            ListenableFuture<RpcResult<CompositeNode>> future = rpcImpl.invokeRpc(TEST_RPC, input);
+
+            RpcResult<CompositeNode> rpcResult = future.get(5, TimeUnit.SECONDS);
+
+            assertSuccessfulRpcResult(rpcResult, (CompositeNode)output.getValue().get(0));
+
+            assertEquals("getRpc", TEST_RPC, invokeRpcMsg.get().getRpc());
+            assertEquals("getInput", input, invokeRpcMsg.get().getInput());
+        } finally {
+            if(assertError.get() != null) {
+                throw assertError.get();
+            }
+        }
+    }
+
+    @Test
+    public void testInvokeRpcWithIdentifier() throws Exception {
+        final AtomicReference<AssertionError> assertError = new AtomicReference<>();
+        try {
+            RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation(
+                    probeReg1.getRef(), schemaContext);
+
+            QName instanceQName = new QName(new URI("ns"), "instance");
+            YangInstanceIdentifier identifier = YangInstanceIdentifier.of(instanceQName);
+
+            CompositeNode input = makeRPCInput("foo");
+            CompositeNode output = makeRPCOutput("bar");
+            final AtomicReference<InvokeRpc> invokeRpcMsg = setupInvokeRpcReply(assertError, output);
+
+            ListenableFuture<RpcResult<CompositeNode>> future = rpcImpl.invokeRpc(
+                    TEST_RPC, identifier, input);
+
+            RpcResult<CompositeNode> rpcResult = future.get(5, TimeUnit.SECONDS);
+
+            assertSuccessfulRpcResult(rpcResult, (CompositeNode)output.getValue().get(0));
+
+            assertEquals("getRpc", TEST_RPC, invokeRpcMsg.get().getRpc());
+            assertEquals("getInput", input, invokeRpcMsg.get().getInput());
+            assertEquals("getRoute", identifier, invokeRpcMsg.get().getIdentifier());
+        } finally {
+            if(assertError.get() != null) {
+                throw assertError.get();
+            }
+        }
+    }
+
+    @Test
+    public void testInvokeRpcWithRpcErrorsException() throws Exception {
+        final AtomicReference<AssertionError> assertError = new AtomicReference<>();
+        try {
+            RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation(
+                    probeReg1.getRef(), schemaContext);
+
+            final CompositeNode input = makeRPCInput("foo");
+
+            setupInvokeRpcErrorReply(assertError, new RpcErrorsException(
+                    "mock", Arrays.asList(RpcResultBuilder.newError(ErrorType.RPC, "tag",
+                            "error", "appTag", "info", null))));
+
+            ListenableFuture<RpcResult<CompositeNode>> future = rpcImpl.invokeRpc(TEST_RPC, input);
+
+            RpcResult<CompositeNode> rpcResult = future.get(5, TimeUnit.SECONDS);
+
+            assertFailedRpcResult(rpcResult, ErrorSeverity.ERROR, ErrorType.RPC, "tag",
+                    "error", "appTag", "info", null);
+        } finally {
+            if(assertError.get() != null) {
+                throw assertError.get();
+            }
+        }
+    }
+
+    @Test
+    public void testInvokeRpcWithOtherException() throws Exception {
+        final AtomicReference<AssertionError> assertError = new AtomicReference<>();
+        try {
+            RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation(
+                    probeReg1.getRef(), schemaContext);
+
+            final CompositeNode input = makeRPCInput("foo");
+
+            setupInvokeRpcErrorReply(assertError, new TestException());
+
+            ListenableFuture<RpcResult<CompositeNode>> future = rpcImpl.invokeRpc(TEST_RPC, input);
+
+            RpcResult<CompositeNode> rpcResult = future.get(5, TimeUnit.SECONDS);
+
+            assertFailedRpcResult(rpcResult, ErrorSeverity.ERROR, ErrorType.RPC, "operation-failed",
+                    TestException.MESSAGE, null, null, TestException.MESSAGE);
+        } finally {
+            if(assertError.get() != null) {
+                throw assertError.get();
+            }
+        }
+    }
+
+    private AtomicReference<InvokeRpc> setupInvokeRpcReply(
+            final AtomicReference<AssertionError> assertError, final CompositeNode output) {
+        return setupInvokeRpcReply(assertError, output, null);
+    }
+
+    private AtomicReference<InvokeRpc> setupInvokeRpcErrorReply(
+            final AtomicReference<AssertionError> assertError, final Exception error) {
+        return setupInvokeRpcReply(assertError, null, error);
+    }
+
+    private AtomicReference<InvokeRpc> setupInvokeRpcReply(
+            final AtomicReference<AssertionError> assertError, final CompositeNode output,
+            final Exception error) {
+        final AtomicReference<InvokeRpc> invokeRpcMsg = new AtomicReference<>();
+
+        new Thread() {
+            @Override
+            public void run() {
+                try {
+                    invokeRpcMsg.set(probeReg1.expectMsgClass(
+                            JavaTestKit.duration("5 seconds"), InvokeRpc.class));
+
+                    if(output != null) {
+                        probeReg1.reply(new RpcResponse(XmlUtils.outputCompositeNodeToXml(
+                                output, schemaContext)));
+                    } else {
+                        probeReg1.reply(new akka.actor.Status.Failure(error));
+                    }
+
+                } catch(AssertionError e) {
+                    assertError.set(e);
+                }
+            }
+
+        }.start();
+
+        return invokeRpcMsg;
+    }
+}
index d9a3b6a414f1666dfa3b062cf7faa54c0baed8c0..28b1b476cd0ebc4d20585b58a91877c076caf33c 100644 (file)
@@ -10,144 +10,268 @@ package org.opendaylight.controller.remote.rpc;
 
 
 import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
 import akka.japi.Pair;
 import akka.testkit.JavaTestKit;
+
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Futures;
-import com.typesafe.config.ConfigFactory;
-import junit.framework.Assert;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
+import static org.junit.Assert.assertEquals;
 import org.junit.Test;
-import org.mockito.Mockito;
-import org.opendaylight.controller.remote.rpc.messages.ErrorResponse;
+import org.mockito.ArgumentCaptor;
+import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
 import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
 import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.controller.sal.common.util.Rpcs;
-import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
+import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
+import org.opendaylight.controller.xml.codec.XmlUtils;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.ModifyAction;
-import org.opendaylight.yangtools.yang.data.api.Node;
-import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.Future;
 
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.any;
+
+public class RpcBrokerTest extends AbstractRpcTest {
+
+    @Test
+    public void testInvokeRpcWithNoRemoteActor() throws Exception {
+        new JavaTestKit(node1) {{
+            CompositeNode input = makeRPCInput("foo");
+
+            InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, input);
+            rpcBroker1.tell(invokeMsg, getRef());
+
+            probeReg1.expectMsgClass(duration("5 seconds"), RpcRegistry.Messages.FindRouters.class);
+            probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(
+                    Collections.<Pair<ActorRef, Long>>emptyList()));
+
+            akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
+                    akka.actor.Status.Failure.class);
+
+            assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass());
+        }};
+    }
+
+
+    /**
+     * This test method invokes and executes the remote rpc
+     */
+    //@Test
+    public void testInvokeRpc() throws URISyntaxException {
+        new JavaTestKit(node1) {{
+            QName instanceQName = new QName(new URI("ns"), "instance");
+
+            CompositeNode invokeRpcResult = makeRPCOutput("bar");
+            RpcResult<CompositeNode> rpcResult =
+                               RpcResultBuilder.<CompositeNode>success(invokeRpcResult).build();
+            ArgumentCaptor<CompositeNode> inputCaptor = new ArgumentCaptor<>();
+            when(brokerSession.rpc(eq(TEST_RPC), inputCaptor.capture()))
+                    .thenReturn(Futures.immediateFuture(rpcResult));
+
+            // invoke rpc
+            CompositeNode input = makeRPCInput("foo");
+            YangInstanceIdentifier instanceID = YangInstanceIdentifier.of(instanceQName);
+            InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, instanceID, input);
+            rpcBroker1.tell(invokeMsg, getRef());
+
+            FindRouters findRouters = probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
+            RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
+            assertEquals("getType", TEST_RPC, routeIdentifier.getType());
+            assertEquals("getRoute", instanceID, routeIdentifier.getRoute());
+
+            probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(
+                    Arrays.asList(new Pair<ActorRef, Long>(rpcBroker2, 200L))));
+
+            RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class);
+            assertCompositeNodeEquals((CompositeNode)invokeRpcResult.getValue().get(0),
+                    XmlUtils.xmlToCompositeNode(rpcResponse.getResultCompositeNode()));
+            assertCompositeNodeEquals(input, inputCaptor.getValue());
+        }};
+    }
+
+    @Test
+    public void testInvokeRpcWithNoOutput() {
+        new JavaTestKit(node1) {{
+
+            RpcResult<CompositeNode> rpcResult = RpcResultBuilder.<CompositeNode>success().build();
+            when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class)))
+                    .thenReturn(Futures.immediateFuture(rpcResult));
+
+            InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, makeRPCInput("foo"));
+            rpcBroker1.tell(invokeMsg, getRef());
+
+            probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
+            probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(
+                    Arrays.asList(new Pair<ActorRef, Long>(rpcBroker2, 200L))));
+
+            RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class);
+
+            assertEquals("getResultCompositeNode", "", rpcResponse.getResultCompositeNode());
+        }};
+    }
+
+    @Test
+    public void testInvokeRpcWithExecuteFailure() {
+        new JavaTestKit(node1) {{
+
+            RpcResult<CompositeNode> rpcResult = RpcResultBuilder.<CompositeNode>failed()
+                    .withError(ErrorType.RPC, "tag", "error", "appTag", "info",
+                            new Exception("mock"))
+                    .build();
+            when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class)))
+                    .thenReturn(Futures.immediateFuture(rpcResult));
+
+            InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, makeRPCInput("foo"));
+            rpcBroker1.tell(invokeMsg, getRef());
+
+            probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
+            probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(
+                    Arrays.asList(new Pair<ActorRef, Long>(rpcBroker2, 200L))));
+
+            akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
+                    akka.actor.Status.Failure.class);
+
+            assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass());
+
+            RpcErrorsException errorsEx = (RpcErrorsException)failure.cause();
+            List<RpcError> rpcErrors = Lists.newArrayList(errorsEx.getRpcErrors());
+            assertEquals("RpcErrors count", 1, rpcErrors.size());
+            assertRpcErrorEquals(rpcErrors.get(0), ErrorSeverity.ERROR, ErrorType.RPC, "tag",
+                    "error", "appTag", "info", "mock");
+        }};
+    }
+
+    @Test
+    public void testInvokeRpcWithFindRoutersFailure() {
+        new JavaTestKit(node1) {{
+
+            InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, makeRPCInput("foo"));
+            rpcBroker1.tell(invokeMsg, getRef());
+
+            probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
+            probeReg1.reply(new akka.actor.Status.Failure(new TestException()));
+
+            akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
+                    akka.actor.Status.Failure.class);
+
+            assertEquals("failure.cause()", TestException.class, failure.cause().getClass());
+        }};
+    }
+
+    @Test
+    public void testExecuteRpc() {
+        new JavaTestKit(node1) {{
+
+            String xml = "<input xmlns=\"urn:test\"><input-data>foo</input-data></input>";
+
+            CompositeNode invokeRpcResult = makeRPCOutput("bar");
+            RpcResult<CompositeNode> rpcResult =
+                               RpcResultBuilder.<CompositeNode>success(invokeRpcResult).build();
+            ArgumentCaptor<CompositeNode> inputCaptor = new ArgumentCaptor<>();
+            when(brokerSession.rpc(eq(TEST_RPC), inputCaptor.capture()))
+                    .thenReturn(Futures.immediateFuture(rpcResult));
+
+            ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC);
+
+            rpcBroker1.tell(executeMsg, getRef());
+
+            RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class);
+
+            assertCompositeNodeEquals((CompositeNode)invokeRpcResult.getValue().get(0),
+                    XmlUtils.xmlToCompositeNode(rpcResponse.getResultCompositeNode()));
+        }};
+    }
+
+    @Test
+    public void testExecuteRpcFailureWithRpcErrors() {
+        new JavaTestKit(node1) {{
+
+            String xml = "<input xmlns=\"urn:test\"><input-data>foo</input-data></input>";
+
+            RpcResult<CompositeNode> rpcResult = RpcResultBuilder.<CompositeNode>failed()
+                    .withError(ErrorType.RPC, "tag1", "error", "appTag1", "info1",
+                            new Exception("mock"))
+                    .withWarning(ErrorType.PROTOCOL, "tag2", "warning", "appTag2", "info2", null)
+                    .build();
+            when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class)))
+                    .thenReturn(Futures.immediateFuture(rpcResult));
+
+            ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC);
+
+            rpcBroker1.tell(executeMsg, getRef());
+
+            akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
+                    akka.actor.Status.Failure.class);
+
+            assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass());
+
+            RpcErrorsException errorsEx = (RpcErrorsException)failure.cause();
+            List<RpcError> rpcErrors = Lists.newArrayList(errorsEx.getRpcErrors());
+            assertEquals("RpcErrors count", 2, rpcErrors.size());
+            assertRpcErrorEquals(rpcErrors.get(0), ErrorSeverity.ERROR, ErrorType.RPC, "tag1",
+                    "error", "appTag1", "info1", "mock");
+            assertRpcErrorEquals(rpcErrors.get(1), ErrorSeverity.WARNING, ErrorType.PROTOCOL, "tag2",
+                    "warning", "appTag2", "info2", null);
+        }};
+    }
+
+    @Test
+    public void testExecuteRpcFailureWithNoRpcErrors() {
+        new JavaTestKit(node1) {{
+
+            String xml = "<input xmlns=\"urn:test\"><input-data>foo</input-data></input>";
+
+            RpcResult<CompositeNode> rpcResult = RpcResultBuilder.<CompositeNode>failed().build();
+            when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class)))
+                    .thenReturn(Futures.immediateFuture(rpcResult));
+
+            ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC);
+
+            rpcBroker1.tell(executeMsg, getRef());
+
+            akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
+                    akka.actor.Status.Failure.class);
+
+            assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass());
+
+            RpcErrorsException errorsEx = (RpcErrorsException)failure.cause();
+            List<RpcError> rpcErrors = Lists.newArrayList(errorsEx.getRpcErrors());
+            assertEquals("RpcErrors count", 1, rpcErrors.size());
+            assertRpcErrorEquals(rpcErrors.get(0), ErrorSeverity.ERROR, ErrorType.RPC,
+                    "operation-failed", "failed", null, null, null);
+        }};
+    }
+
+    @Test
+    public void testExecuteRpcFailureWithException() {
+        new JavaTestKit(node1) {{
+
+            String xml = "<input xmlns=\"urn:test\"><input-data>foo</input-data></input>";
+
+            when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class)))
+                    .thenReturn(Futures.<RpcResult<CompositeNode>>immediateFailedFuture(
+                            new TestException()));
+
+            ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC);
+
+            rpcBroker1.tell(executeMsg, getRef());
+
+            akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
+                    akka.actor.Status.Failure.class);
 
-public class RpcBrokerTest {
-
-  static ActorSystem node1;
-  static ActorSystem node2;
-  private ActorRef rpcBroker1;
-  private JavaTestKit probeReg1;
-  private ActorRef rpcBroker2;
-  private JavaTestKit probeReg2;
-  private Broker.ProviderSession brokerSession;
-
-
-  @BeforeClass
-  public static void setup() throws InterruptedException {
-    node1 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberA"));
-    node2 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberB"));
-  }
-
-  @AfterClass
-  public static void teardown() {
-    JavaTestKit.shutdownActorSystem(node1);
-    JavaTestKit.shutdownActorSystem(node2);
-    node1 = null;
-    node2 = null;
-  }
-
-  @Before
-  public void createActor() {
-    brokerSession = Mockito.mock(Broker.ProviderSession.class);
-    SchemaContext schemaContext = mock(SchemaContext.class);
-    probeReg1 = new JavaTestKit(node1);
-    rpcBroker1 = node1.actorOf(RpcBroker.props(brokerSession, probeReg1.getRef(), schemaContext));
-    probeReg2 = new JavaTestKit(node2);
-    rpcBroker2 = node2.actorOf(RpcBroker.props(brokerSession, probeReg2.getRef(), schemaContext));
-
-  }
-  @Test
-  public void testInvokeRpcError() throws Exception {
-    new JavaTestKit(node1) {{
-      QName rpc = new QName(new URI("noactor1"), "noactor1");
-      CompositeNode input = new ImmutableCompositeNode(QName.create("ns", "2013-12-09", "no child"), new ArrayList<Node<?>>(), ModifyAction.REPLACE);
-
-
-      InvokeRpc invokeMsg = new InvokeRpc(rpc, null, input);
-      rpcBroker1.tell(invokeMsg, getRef());
-      probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
-      probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(new ArrayList<Pair<ActorRef, Long>>()));
-
-      Boolean getMsg = new ExpectMsg<Boolean>("ErrorResponse") {
-        protected Boolean match(Object in) {
-          if (in instanceof ErrorResponse) {
-            ErrorResponse reply = (ErrorResponse)in;
-            return reply.getException().getMessage().contains("No remote actor found for rpc execution of :");
-          } else {
-            throw noMatch();
-          }
-        }
-      }.get(); // this extracts the received message
-
-      Assert.assertTrue(getMsg);
-
-    }};
-  }
-
-
-  /**
-   * This test method invokes and executes the remote rpc
-   */
-
-  @Test
-  public void testInvokeRpc() throws URISyntaxException {
-    new JavaTestKit(node1) {{
-      QName rpc = new QName(new URI("noactor1"), "noactor1");
-      // invoke rpc
-      CompositeNode input = new ImmutableCompositeNode(QName.create("ns", "2013-12-09", "child1"), new ArrayList<Node<?>>(), ModifyAction.REPLACE);
-      InvokeRpc invokeMsg = new InvokeRpc(rpc, null, input);
-      rpcBroker1.tell(invokeMsg, getRef());
-
-      probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
-      List<Pair<ActorRef, Long>> routerList = new ArrayList<Pair<ActorRef, Long>>();
-
-      routerList.add(new Pair<ActorRef, Long>(rpcBroker2, 200L));
-
-      probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(routerList));
-
-      CompositeNode invokeRpcResult = mock(CompositeNode.class);
-      Collection<RpcError> errors = new ArrayList<>();
-      RpcResult<CompositeNode> result = Rpcs.getRpcResult(true, invokeRpcResult, errors);
-      Future<RpcResult<CompositeNode>> rpcResult = Futures.immediateFuture(result);
-      when(brokerSession.rpc(rpc, input)).thenReturn(rpcResult);
-
-      //verify response msg
-      Boolean getMsg = new ExpectMsg<Boolean>("RpcResponse") {
-        protected Boolean match(Object in) {
-          if (in instanceof RpcResponse) {
-            return true;
-          } else {
-            throw noMatch();
-          }
-        }
-      }.get(); // this extracts the received message
-
-      Assert.assertTrue(getMsg);
-    }};
-  }
+            assertEquals("failure.cause()", TestException.class, failure.cause().getClass());
+        }};
+    }
 }
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/test-rpc.yang b/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/test-rpc.yang
new file mode 100644 (file)
index 0000000..3474e91
--- /dev/null
@@ -0,0 +1,24 @@
+module test-rpc-service {
+    yang-version 1;
+    namespace "urn:test";
+    prefix "rpc";
+
+    revision "2014-08-28" {
+        description 
+            "Initial revision";
+    }
+
+    rpc test-rpc {
+        input {
+            leaf input-data {
+                type string;
+            }
+        }
+        
+        output {
+            leaf output-data {
+                type string;
+            }
+        }
+    }
+}
\ No newline at end of file