Merge "BUG 1623 - Clustering : Parsing Error thrown on startup"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / test / java / org / opendaylight / controller / remote / rpc / RpcBrokerTest.java
index 392c1e637d848e1ccc47d43b3a951011a2e25a72..28b1b476cd0ebc4d20585b58a91877c076caf33c 100644 (file)
@@ -10,208 +10,268 @@ package org.opendaylight.controller.remote.rpc;
 
 
 import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
+import akka.japi.Pair;
 import akka.testkit.JavaTestKit;
+
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Futures;
-import junit.framework.Assert;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import static org.junit.Assert.assertEquals;
 import org.junit.Test;
-import org.mockito.Mockito;
-import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc;
-import org.opendaylight.controller.remote.rpc.messages.AddRpc;
-import org.opendaylight.controller.remote.rpc.messages.ErrorResponse;
-import org.opendaylight.controller.remote.rpc.messages.InvokeRoutedRpc;
+import org.mockito.ArgumentCaptor;
+import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
 import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
 import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
-import org.opendaylight.controller.remote.rpc.registry.ClusterWrapper;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.controller.sal.common.util.Rpcs;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
+import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
+import org.opendaylight.controller.xml.codec.XmlUtils;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.ModifyAction;
-import org.opendaylight.yangtools.yang.data.api.Node;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.Future;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.any;
+
+public class RpcBrokerTest extends AbstractRpcTest {
+
+    @Test
+    public void testInvokeRpcWithNoRemoteActor() throws Exception {
+        new JavaTestKit(node1) {{
+            CompositeNode input = makeRPCInput("foo");
+
+            InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, input);
+            rpcBroker1.tell(invokeMsg, getRef());
+
+            probeReg1.expectMsgClass(duration("5 seconds"), RpcRegistry.Messages.FindRouters.class);
+            probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(
+                    Collections.<Pair<ActorRef, Long>>emptyList()));
+
+            akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
+                    akka.actor.Status.Failure.class);
+
+            assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass());
+        }};
+    }
+
+
+    /**
+     * This test method invokes and executes the remote rpc
+     */
+    //@Test
+    public void testInvokeRpc() throws URISyntaxException {
+        new JavaTestKit(node1) {{
+            QName instanceQName = new QName(new URI("ns"), "instance");
+
+            CompositeNode invokeRpcResult = makeRPCOutput("bar");
+            RpcResult<CompositeNode> rpcResult =
+                               RpcResultBuilder.<CompositeNode>success(invokeRpcResult).build();
+            ArgumentCaptor<CompositeNode> inputCaptor = new ArgumentCaptor<>();
+            when(brokerSession.rpc(eq(TEST_RPC), inputCaptor.capture()))
+                    .thenReturn(Futures.immediateFuture(rpcResult));
+
+            // invoke rpc
+            CompositeNode input = makeRPCInput("foo");
+            YangInstanceIdentifier instanceID = YangInstanceIdentifier.of(instanceQName);
+            InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, instanceID, input);
+            rpcBroker1.tell(invokeMsg, getRef());
+
+            FindRouters findRouters = probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
+            RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
+            assertEquals("getType", TEST_RPC, routeIdentifier.getType());
+            assertEquals("getRoute", instanceID, routeIdentifier.getRoute());
+
+            probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(
+                    Arrays.asList(new Pair<ActorRef, Long>(rpcBroker2, 200L))));
+
+            RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class);
+            assertCompositeNodeEquals((CompositeNode)invokeRpcResult.getValue().get(0),
+                    XmlUtils.xmlToCompositeNode(rpcResponse.getResultCompositeNode()));
+            assertCompositeNodeEquals(input, inputCaptor.getValue());
+        }};
+    }
+
+    @Test
+    public void testInvokeRpcWithNoOutput() {
+        new JavaTestKit(node1) {{
+
+            RpcResult<CompositeNode> rpcResult = RpcResultBuilder.<CompositeNode>success().build();
+            when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class)))
+                    .thenReturn(Futures.immediateFuture(rpcResult));
+
+            InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, makeRPCInput("foo"));
+            rpcBroker1.tell(invokeMsg, getRef());
+
+            probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
+            probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(
+                    Arrays.asList(new Pair<ActorRef, Long>(rpcBroker2, 200L))));
+
+            RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class);
+
+            assertEquals("getResultCompositeNode", "", rpcResponse.getResultCompositeNode());
+        }};
+    }
+
+    @Test
+    public void testInvokeRpcWithExecuteFailure() {
+        new JavaTestKit(node1) {{
+
+            RpcResult<CompositeNode> rpcResult = RpcResultBuilder.<CompositeNode>failed()
+                    .withError(ErrorType.RPC, "tag", "error", "appTag", "info",
+                            new Exception("mock"))
+                    .build();
+            when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class)))
+                    .thenReturn(Futures.immediateFuture(rpcResult));
+
+            InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, makeRPCInput("foo"));
+            rpcBroker1.tell(invokeMsg, getRef());
+
+            probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
+            probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(
+                    Arrays.asList(new Pair<ActorRef, Long>(rpcBroker2, 200L))));
+
+            akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
+                    akka.actor.Status.Failure.class);
+
+            assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass());
+
+            RpcErrorsException errorsEx = (RpcErrorsException)failure.cause();
+            List<RpcError> rpcErrors = Lists.newArrayList(errorsEx.getRpcErrors());
+            assertEquals("RpcErrors count", 1, rpcErrors.size());
+            assertRpcErrorEquals(rpcErrors.get(0), ErrorSeverity.ERROR, ErrorType.RPC, "tag",
+                    "error", "appTag", "info", "mock");
+        }};
+    }
+
+    @Test
+    public void testInvokeRpcWithFindRoutersFailure() {
+        new JavaTestKit(node1) {{
+
+            InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, makeRPCInput("foo"));
+            rpcBroker1.tell(invokeMsg, getRef());
+
+            probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
+            probeReg1.reply(new akka.actor.Status.Failure(new TestException()));
+
+            akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
+                    akka.actor.Status.Failure.class);
+
+            assertEquals("failure.cause()", TestException.class, failure.cause().getClass());
+        }};
+    }
+
+    @Test
+    public void testExecuteRpc() {
+        new JavaTestKit(node1) {{
+
+            String xml = "<input xmlns=\"urn:test\"><input-data>foo</input-data></input>";
+
+            CompositeNode invokeRpcResult = makeRPCOutput("bar");
+            RpcResult<CompositeNode> rpcResult =
+                               RpcResultBuilder.<CompositeNode>success(invokeRpcResult).build();
+            ArgumentCaptor<CompositeNode> inputCaptor = new ArgumentCaptor<>();
+            when(brokerSession.rpc(eq(TEST_RPC), inputCaptor.capture()))
+                    .thenReturn(Futures.immediateFuture(rpcResult));
+
+            ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC);
+
+            rpcBroker1.tell(executeMsg, getRef());
+
+            RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class);
+
+            assertCompositeNodeEquals((CompositeNode)invokeRpcResult.getValue().get(0),
+                    XmlUtils.xmlToCompositeNode(rpcResponse.getResultCompositeNode()));
+        }};
+    }
+
+    @Test
+    public void testExecuteRpcFailureWithRpcErrors() {
+        new JavaTestKit(node1) {{
+
+            String xml = "<input xmlns=\"urn:test\"><input-data>foo</input-data></input>";
+
+            RpcResult<CompositeNode> rpcResult = RpcResultBuilder.<CompositeNode>failed()
+                    .withError(ErrorType.RPC, "tag1", "error", "appTag1", "info1",
+                            new Exception("mock"))
+                    .withWarning(ErrorType.PROTOCOL, "tag2", "warning", "appTag2", "info2", null)
+                    .build();
+            when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class)))
+                    .thenReturn(Futures.immediateFuture(rpcResult));
+
+            ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC);
+
+            rpcBroker1.tell(executeMsg, getRef());
+
+            akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
+                    akka.actor.Status.Failure.class);
+
+            assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass());
+
+            RpcErrorsException errorsEx = (RpcErrorsException)failure.cause();
+            List<RpcError> rpcErrors = Lists.newArrayList(errorsEx.getRpcErrors());
+            assertEquals("RpcErrors count", 2, rpcErrors.size());
+            assertRpcErrorEquals(rpcErrors.get(0), ErrorSeverity.ERROR, ErrorType.RPC, "tag1",
+                    "error", "appTag1", "info1", "mock");
+            assertRpcErrorEquals(rpcErrors.get(1), ErrorSeverity.WARNING, ErrorType.PROTOCOL, "tag2",
+                    "warning", "appTag2", "info2", null);
+        }};
+    }
+
+    @Test
+    public void testExecuteRpcFailureWithNoRpcErrors() {
+        new JavaTestKit(node1) {{
+
+            String xml = "<input xmlns=\"urn:test\"><input-data>foo</input-data></input>";
+
+            RpcResult<CompositeNode> rpcResult = RpcResultBuilder.<CompositeNode>failed().build();
+            when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class)))
+                    .thenReturn(Futures.immediateFuture(rpcResult));
+
+            ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC);
+
+            rpcBroker1.tell(executeMsg, getRef());
+
+            akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
+                    akka.actor.Status.Failure.class);
+
+            assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass());
+
+            RpcErrorsException errorsEx = (RpcErrorsException)failure.cause();
+            List<RpcError> rpcErrors = Lists.newArrayList(errorsEx.getRpcErrors());
+            assertEquals("RpcErrors count", 1, rpcErrors.size());
+            assertRpcErrorEquals(rpcErrors.get(0), ErrorSeverity.ERROR, ErrorType.RPC,
+                    "operation-failed", "failed", null, null, null);
+        }};
+    }
+
+    @Test
+    public void testExecuteRpcFailureWithException() {
+        new JavaTestKit(node1) {{
+
+            String xml = "<input xmlns=\"urn:test\"><input-data>foo</input-data></input>";
+
+            when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class)))
+                    .thenReturn(Futures.<RpcResult<CompositeNode>>immediateFailedFuture(
+                            new TestException()));
+
+            ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC);
+
+            rpcBroker1.tell(executeMsg, getRef());
 
-public class RpcBrokerTest {
-
-  static ActorSystem system;
-
-
-  @BeforeClass
-  public static void setup() {
-    system = ActorSystem.create();
-  }
-
-  @AfterClass
-  public static void teardown() {
-    JavaTestKit.shutdownActorSystem(system);
-    system = null;
-  }
-
-  @Test
-  public void testInvokeRpcError() throws URISyntaxException {
-    new JavaTestKit(system) {{
-      ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(Mockito.mock(ClusterWrapper.class)));
-      Broker.ProviderSession brokerSession = Mockito.mock(Broker.ProviderSession.class);
-      SchemaContext schemaContext = mock(SchemaContext.class);
-      ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext));
-      QName rpc = new QName(new URI("noactor1"), "noactor1");
-      CompositeNode input = new ImmutableCompositeNode(QName.create("ns", "2013-12-09", "no child"), new ArrayList<Node<?>>(), ModifyAction.REPLACE);
-      InvokeRpc invokeMsg = new InvokeRpc(rpc, input);
-      rpcBroker.tell(invokeMsg, getRef());
-
-      Boolean getMsg = new ExpectMsg<Boolean>("ErrorResponse") {
-        protected Boolean match(Object in) {
-          if (in instanceof ErrorResponse) {
-            ErrorResponse reply = (ErrorResponse)in;
-            return reply.getException().getMessage().contains("No remote actor found for rpc execution of :");
-          } else {
-            throw noMatch();
-          }
-        }
-      }.get(); // this extracts the received message
-
-      Assert.assertTrue(getMsg);
-    }};
-  }
-
-  /**
-   * This test method invokes and executes the remote rpc
-   */
-
-  @Test
-  public void testInvokeRpc() throws URISyntaxException {
-    new JavaTestKit(system) {{
-      ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(mock(ClusterWrapper.class)));
-      Broker.ProviderSession brokerSession = mock(Broker.ProviderSession.class);
-      SchemaContext schemaContext = mock(SchemaContext.class);
-      ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext));
-      ActorRef rpcBrokerRemote = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext), "actor1");
-      // Add RPC in table
-      QName rpc = new QName(new URI("actor1"), "actor1");
-      RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, rpc, null);
-      final String route = rpcBrokerRemote.path().toString();
-      AddRpc rpcMsg = new AddRpc(routeId, route);
-      rpcRegistry.tell(rpcMsg, getRef());
-      expectMsgEquals(duration("2 second"), "Success");
-
-      // invoke rpc
-      CompositeNode input = new ImmutableCompositeNode(QName.create("ns", "2013-12-09", "child1"), new ArrayList<Node<?>>(), ModifyAction.REPLACE);
-      CompositeNode invokeRpcResult = mock(CompositeNode.class);
-      Collection<RpcError> errors = new ArrayList<>();
-      RpcResult<CompositeNode> result = Rpcs.getRpcResult(true, invokeRpcResult, errors);
-      Future<RpcResult<CompositeNode>> rpcResult = Futures.immediateFuture(result);
-      when(brokerSession.rpc(rpc, input)).thenReturn(rpcResult);
-      InvokeRpc invokeMsg = new InvokeRpc(rpc, input);
-      rpcBroker.tell(invokeMsg, getRef());
-
-      //verify response msg
-      Boolean getMsg = new ExpectMsg<Boolean>("RpcResponse") {
-        protected Boolean match(Object in) {
-          if (in instanceof RpcResponse) {
-            return true;
-          } else {
-            throw noMatch();
-          }
-        }
-      }.get(); // this extracts the received message
-
-      Assert.assertTrue(getMsg);
-    }};
-  }
-
-  @Test
-  public void testInvokeRoutedRpcError() throws URISyntaxException {
-    new JavaTestKit(system) {{
-      ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(Mockito.mock(ClusterWrapper.class)));
-      Broker.ProviderSession brokerSession = Mockito.mock(Broker.ProviderSession.class);
-      SchemaContext schemaContext = mock(SchemaContext.class);
-      ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext));
-      QName rpc = new QName(new URI("actor1"), "actor1");
-      CompositeNode input = new ImmutableCompositeNode(QName.create("ns", "2013-12-09", "child1"), new ArrayList<Node<?>>(), ModifyAction.REPLACE);
-      InvokeRoutedRpc invokeMsg = new InvokeRoutedRpc(rpc, YangInstanceIdentifier.create(new YangInstanceIdentifier.NodeIdentifier(rpc)), input);
-      rpcBroker.tell(invokeMsg, getRef());
-
-      Boolean getMsg = new ExpectMsg<Boolean>("ErrorResponse") {
-        protected Boolean match(Object in) {
-          if (in instanceof ErrorResponse) {
-            ErrorResponse reply = (ErrorResponse)in;
-            return "No remote actor found for rpc execution.".equals(reply.getException().getMessage());
-          } else {
-            throw noMatch();
-          }
-        }
-      }.get(); // this extracts the received message
-
-      Assert.assertTrue(getMsg);
-    }};
-  }
-
-  /**
-   * This test method invokes and executes the remote routed rpc
-   */
-
-  @Test
-  public void testInvokeRoutedRpc() throws URISyntaxException {
-    new JavaTestKit(system) {{
-      ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(mock(ClusterWrapper.class)));
-      Broker.ProviderSession brokerSession = mock(Broker.ProviderSession.class);
-      SchemaContext schemaContext = mock(SchemaContext.class);
-      ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext));
-      ActorRef rpcBrokerRemote = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext), "actor2");
-      // Add Routed RPC in table
-      QName rpc = new QName(new URI("actor2"), "actor2");
-      YangInstanceIdentifier identifier = YangInstanceIdentifier.create(new YangInstanceIdentifier.NodeIdentifier(rpc));
-      RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, rpc, identifier);
-      final String route = rpcBrokerRemote.path().toString();
-      Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new HashSet<>();
-      routeIds.add(routeId);
-
-      AddRoutedRpc rpcMsg = new AddRoutedRpc(routeIds, route);
-      rpcRegistry.tell(rpcMsg, getRef());
-      expectMsgEquals(duration("2 second"), "Success");
-
-      // invoke rpc
-      CompositeNode input = new ImmutableCompositeNode(QName.create("ns", "2013-12-09", "child1"), new ArrayList<Node<?>>(), ModifyAction.REPLACE);
-      CompositeNode invokeRpcResult = mock(CompositeNode.class);
-      Collection<RpcError> errors = new ArrayList<>();
-      RpcResult<CompositeNode> result = Rpcs.getRpcResult(true, invokeRpcResult, errors);
-      Future<RpcResult<CompositeNode>> rpcResult = Futures.immediateFuture(result);
-      when(brokerSession.rpc(rpc, input)).thenReturn(rpcResult);
-      InvokeRoutedRpc invokeMsg = new InvokeRoutedRpc(rpc, identifier, input);
-      rpcBroker.tell(invokeMsg, getRef());
-
-      //verify response msg
-      Boolean getMsg = new ExpectMsg<Boolean>("RpcResponse") {
-        protected Boolean match(Object in) {
-          if (in instanceof RpcResponse) {
-            return true;
-          } else {
-            throw noMatch();
-          }
-        }
-      }.get(); // this extracts the received message
-
-      Assert.assertTrue(getMsg);
-    }};
-  }
+            akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
+                    akka.actor.Status.Failure.class);
 
+            assertEquals("failure.cause()", TestException.class, failure.cause().getClass());
+        }};
+    }
 }