Bug 1637: Change Rpc actor calls to async
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / test / java / org / opendaylight / controller / remote / rpc / RpcBrokerTest.java
index d9a3b6a414f1666dfa3b062cf7faa54c0baed8c0..28b1b476cd0ebc4d20585b58a91877c076caf33c 100644 (file)
@@ -10,144 +10,268 @@ package org.opendaylight.controller.remote.rpc;
 
 
 import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
 import akka.japi.Pair;
 import akka.testkit.JavaTestKit;
+
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Futures;
-import com.typesafe.config.ConfigFactory;
-import junit.framework.Assert;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
+import static org.junit.Assert.assertEquals;
 import org.junit.Test;
-import org.mockito.Mockito;
-import org.opendaylight.controller.remote.rpc.messages.ErrorResponse;
+import org.mockito.ArgumentCaptor;
+import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
 import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
 import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.controller.sal.common.util.Rpcs;
-import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
+import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
+import org.opendaylight.controller.xml.codec.XmlUtils;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.ModifyAction;
-import org.opendaylight.yangtools.yang.data.api.Node;
-import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.Future;
 
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.any;
+
+public class RpcBrokerTest extends AbstractRpcTest {
+
+    @Test
+    public void testInvokeRpcWithNoRemoteActor() throws Exception {
+        new JavaTestKit(node1) {{
+            CompositeNode input = makeRPCInput("foo");
+
+            InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, input);
+            rpcBroker1.tell(invokeMsg, getRef());
+
+            probeReg1.expectMsgClass(duration("5 seconds"), RpcRegistry.Messages.FindRouters.class);
+            probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(
+                    Collections.<Pair<ActorRef, Long>>emptyList()));
+
+            akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
+                    akka.actor.Status.Failure.class);
+
+            assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass());
+        }};
+    }
+
+
+    /**
+     * This test method invokes and executes the remote rpc
+     */
+    //@Test
+    public void testInvokeRpc() throws URISyntaxException {
+        new JavaTestKit(node1) {{
+            QName instanceQName = new QName(new URI("ns"), "instance");
+
+            CompositeNode invokeRpcResult = makeRPCOutput("bar");
+            RpcResult<CompositeNode> rpcResult =
+                               RpcResultBuilder.<CompositeNode>success(invokeRpcResult).build();
+            ArgumentCaptor<CompositeNode> inputCaptor = new ArgumentCaptor<>();
+            when(brokerSession.rpc(eq(TEST_RPC), inputCaptor.capture()))
+                    .thenReturn(Futures.immediateFuture(rpcResult));
+
+            // invoke rpc
+            CompositeNode input = makeRPCInput("foo");
+            YangInstanceIdentifier instanceID = YangInstanceIdentifier.of(instanceQName);
+            InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, instanceID, input);
+            rpcBroker1.tell(invokeMsg, getRef());
+
+            FindRouters findRouters = probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
+            RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
+            assertEquals("getType", TEST_RPC, routeIdentifier.getType());
+            assertEquals("getRoute", instanceID, routeIdentifier.getRoute());
+
+            probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(
+                    Arrays.asList(new Pair<ActorRef, Long>(rpcBroker2, 200L))));
+
+            RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class);
+            assertCompositeNodeEquals((CompositeNode)invokeRpcResult.getValue().get(0),
+                    XmlUtils.xmlToCompositeNode(rpcResponse.getResultCompositeNode()));
+            assertCompositeNodeEquals(input, inputCaptor.getValue());
+        }};
+    }
+
+    @Test
+    public void testInvokeRpcWithNoOutput() {
+        new JavaTestKit(node1) {{
+
+            RpcResult<CompositeNode> rpcResult = RpcResultBuilder.<CompositeNode>success().build();
+            when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class)))
+                    .thenReturn(Futures.immediateFuture(rpcResult));
+
+            InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, makeRPCInput("foo"));
+            rpcBroker1.tell(invokeMsg, getRef());
+
+            probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
+            probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(
+                    Arrays.asList(new Pair<ActorRef, Long>(rpcBroker2, 200L))));
+
+            RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class);
+
+            assertEquals("getResultCompositeNode", "", rpcResponse.getResultCompositeNode());
+        }};
+    }
+
+    @Test
+    public void testInvokeRpcWithExecuteFailure() {
+        new JavaTestKit(node1) {{
+
+            RpcResult<CompositeNode> rpcResult = RpcResultBuilder.<CompositeNode>failed()
+                    .withError(ErrorType.RPC, "tag", "error", "appTag", "info",
+                            new Exception("mock"))
+                    .build();
+            when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class)))
+                    .thenReturn(Futures.immediateFuture(rpcResult));
+
+            InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, makeRPCInput("foo"));
+            rpcBroker1.tell(invokeMsg, getRef());
+
+            probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
+            probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(
+                    Arrays.asList(new Pair<ActorRef, Long>(rpcBroker2, 200L))));
+
+            akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
+                    akka.actor.Status.Failure.class);
+
+            assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass());
+
+            RpcErrorsException errorsEx = (RpcErrorsException)failure.cause();
+            List<RpcError> rpcErrors = Lists.newArrayList(errorsEx.getRpcErrors());
+            assertEquals("RpcErrors count", 1, rpcErrors.size());
+            assertRpcErrorEquals(rpcErrors.get(0), ErrorSeverity.ERROR, ErrorType.RPC, "tag",
+                    "error", "appTag", "info", "mock");
+        }};
+    }
+
+    @Test
+    public void testInvokeRpcWithFindRoutersFailure() {
+        new JavaTestKit(node1) {{
+
+            InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, makeRPCInput("foo"));
+            rpcBroker1.tell(invokeMsg, getRef());
+
+            probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
+            probeReg1.reply(new akka.actor.Status.Failure(new TestException()));
+
+            akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
+                    akka.actor.Status.Failure.class);
+
+            assertEquals("failure.cause()", TestException.class, failure.cause().getClass());
+        }};
+    }
+
+    @Test
+    public void testExecuteRpc() {
+        new JavaTestKit(node1) {{
+
+            String xml = "<input xmlns=\"urn:test\"><input-data>foo</input-data></input>";
+
+            CompositeNode invokeRpcResult = makeRPCOutput("bar");
+            RpcResult<CompositeNode> rpcResult =
+                               RpcResultBuilder.<CompositeNode>success(invokeRpcResult).build();
+            ArgumentCaptor<CompositeNode> inputCaptor = new ArgumentCaptor<>();
+            when(brokerSession.rpc(eq(TEST_RPC), inputCaptor.capture()))
+                    .thenReturn(Futures.immediateFuture(rpcResult));
+
+            ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC);
+
+            rpcBroker1.tell(executeMsg, getRef());
+
+            RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class);
+
+            assertCompositeNodeEquals((CompositeNode)invokeRpcResult.getValue().get(0),
+                    XmlUtils.xmlToCompositeNode(rpcResponse.getResultCompositeNode()));
+        }};
+    }
+
+    @Test
+    public void testExecuteRpcFailureWithRpcErrors() {
+        new JavaTestKit(node1) {{
+
+            String xml = "<input xmlns=\"urn:test\"><input-data>foo</input-data></input>";
+
+            RpcResult<CompositeNode> rpcResult = RpcResultBuilder.<CompositeNode>failed()
+                    .withError(ErrorType.RPC, "tag1", "error", "appTag1", "info1",
+                            new Exception("mock"))
+                    .withWarning(ErrorType.PROTOCOL, "tag2", "warning", "appTag2", "info2", null)
+                    .build();
+            when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class)))
+                    .thenReturn(Futures.immediateFuture(rpcResult));
+
+            ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC);
+
+            rpcBroker1.tell(executeMsg, getRef());
+
+            akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
+                    akka.actor.Status.Failure.class);
+
+            assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass());
+
+            RpcErrorsException errorsEx = (RpcErrorsException)failure.cause();
+            List<RpcError> rpcErrors = Lists.newArrayList(errorsEx.getRpcErrors());
+            assertEquals("RpcErrors count", 2, rpcErrors.size());
+            assertRpcErrorEquals(rpcErrors.get(0), ErrorSeverity.ERROR, ErrorType.RPC, "tag1",
+                    "error", "appTag1", "info1", "mock");
+            assertRpcErrorEquals(rpcErrors.get(1), ErrorSeverity.WARNING, ErrorType.PROTOCOL, "tag2",
+                    "warning", "appTag2", "info2", null);
+        }};
+    }
+
+    @Test
+    public void testExecuteRpcFailureWithNoRpcErrors() {
+        new JavaTestKit(node1) {{
+
+            String xml = "<input xmlns=\"urn:test\"><input-data>foo</input-data></input>";
+
+            RpcResult<CompositeNode> rpcResult = RpcResultBuilder.<CompositeNode>failed().build();
+            when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class)))
+                    .thenReturn(Futures.immediateFuture(rpcResult));
+
+            ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC);
+
+            rpcBroker1.tell(executeMsg, getRef());
+
+            akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
+                    akka.actor.Status.Failure.class);
+
+            assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass());
+
+            RpcErrorsException errorsEx = (RpcErrorsException)failure.cause();
+            List<RpcError> rpcErrors = Lists.newArrayList(errorsEx.getRpcErrors());
+            assertEquals("RpcErrors count", 1, rpcErrors.size());
+            assertRpcErrorEquals(rpcErrors.get(0), ErrorSeverity.ERROR, ErrorType.RPC,
+                    "operation-failed", "failed", null, null, null);
+        }};
+    }
+
+    @Test
+    public void testExecuteRpcFailureWithException() {
+        new JavaTestKit(node1) {{
+
+            String xml = "<input xmlns=\"urn:test\"><input-data>foo</input-data></input>";
+
+            when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class)))
+                    .thenReturn(Futures.<RpcResult<CompositeNode>>immediateFailedFuture(
+                            new TestException()));
+
+            ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC);
+
+            rpcBroker1.tell(executeMsg, getRef());
+
+            akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
+                    akka.actor.Status.Failure.class);
 
-public class RpcBrokerTest {
-
-  static ActorSystem node1;
-  static ActorSystem node2;
-  private ActorRef rpcBroker1;
-  private JavaTestKit probeReg1;
-  private ActorRef rpcBroker2;
-  private JavaTestKit probeReg2;
-  private Broker.ProviderSession brokerSession;
-
-
-  @BeforeClass
-  public static void setup() throws InterruptedException {
-    node1 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberA"));
-    node2 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberB"));
-  }
-
-  @AfterClass
-  public static void teardown() {
-    JavaTestKit.shutdownActorSystem(node1);
-    JavaTestKit.shutdownActorSystem(node2);
-    node1 = null;
-    node2 = null;
-  }
-
-  @Before
-  public void createActor() {
-    brokerSession = Mockito.mock(Broker.ProviderSession.class);
-    SchemaContext schemaContext = mock(SchemaContext.class);
-    probeReg1 = new JavaTestKit(node1);
-    rpcBroker1 = node1.actorOf(RpcBroker.props(brokerSession, probeReg1.getRef(), schemaContext));
-    probeReg2 = new JavaTestKit(node2);
-    rpcBroker2 = node2.actorOf(RpcBroker.props(brokerSession, probeReg2.getRef(), schemaContext));
-
-  }
-  @Test
-  public void testInvokeRpcError() throws Exception {
-    new JavaTestKit(node1) {{
-      QName rpc = new QName(new URI("noactor1"), "noactor1");
-      CompositeNode input = new ImmutableCompositeNode(QName.create("ns", "2013-12-09", "no child"), new ArrayList<Node<?>>(), ModifyAction.REPLACE);
-
-
-      InvokeRpc invokeMsg = new InvokeRpc(rpc, null, input);
-      rpcBroker1.tell(invokeMsg, getRef());
-      probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
-      probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(new ArrayList<Pair<ActorRef, Long>>()));
-
-      Boolean getMsg = new ExpectMsg<Boolean>("ErrorResponse") {
-        protected Boolean match(Object in) {
-          if (in instanceof ErrorResponse) {
-            ErrorResponse reply = (ErrorResponse)in;
-            return reply.getException().getMessage().contains("No remote actor found for rpc execution of :");
-          } else {
-            throw noMatch();
-          }
-        }
-      }.get(); // this extracts the received message
-
-      Assert.assertTrue(getMsg);
-
-    }};
-  }
-
-
-  /**
-   * This test method invokes and executes the remote rpc
-   */
-
-  @Test
-  public void testInvokeRpc() throws URISyntaxException {
-    new JavaTestKit(node1) {{
-      QName rpc = new QName(new URI("noactor1"), "noactor1");
-      // invoke rpc
-      CompositeNode input = new ImmutableCompositeNode(QName.create("ns", "2013-12-09", "child1"), new ArrayList<Node<?>>(), ModifyAction.REPLACE);
-      InvokeRpc invokeMsg = new InvokeRpc(rpc, null, input);
-      rpcBroker1.tell(invokeMsg, getRef());
-
-      probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
-      List<Pair<ActorRef, Long>> routerList = new ArrayList<Pair<ActorRef, Long>>();
-
-      routerList.add(new Pair<ActorRef, Long>(rpcBroker2, 200L));
-
-      probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(routerList));
-
-      CompositeNode invokeRpcResult = mock(CompositeNode.class);
-      Collection<RpcError> errors = new ArrayList<>();
-      RpcResult<CompositeNode> result = Rpcs.getRpcResult(true, invokeRpcResult, errors);
-      Future<RpcResult<CompositeNode>> rpcResult = Futures.immediateFuture(result);
-      when(brokerSession.rpc(rpc, input)).thenReturn(rpcResult);
-
-      //verify response msg
-      Boolean getMsg = new ExpectMsg<Boolean>("RpcResponse") {
-        protected Boolean match(Object in) {
-          if (in instanceof RpcResponse) {
-            return true;
-          } else {
-            throw noMatch();
-          }
-        }
-      }.get(); // this extracts the received message
-
-      Assert.assertTrue(getMsg);
-    }};
-  }
+            assertEquals("failure.cause()", TestException.class, failure.cause().getClass());
+        }};
+    }
 }