import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
+import akka.japi.Pair;
import akka.testkit.JavaTestKit;
+
+import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
-import junit.framework.Assert;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import static org.junit.Assert.assertEquals;
import org.junit.Test;
-import org.mockito.Mockito;
-import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc;
-import org.opendaylight.controller.remote.rpc.messages.AddRpc;
-import org.opendaylight.controller.remote.rpc.messages.ErrorResponse;
-import org.opendaylight.controller.remote.rpc.messages.InvokeRoutedRpc;
+import org.mockito.ArgumentCaptor;
+import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
-import org.opendaylight.controller.remote.rpc.registry.ClusterWrapper;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.controller.sal.common.util.Rpcs;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
+import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
+import org.opendaylight.controller.xml.codec.XmlUtils;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.ModifyAction;
-import org.opendaylight.yangtools.yang.data.api.Node;
-import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.Future;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.any;
+
+public class RpcBrokerTest extends AbstractRpcTest {
+
+ @Test
+ public void testInvokeRpcWithNoRemoteActor() throws Exception {
+ new JavaTestKit(node1) {{
+ CompositeNode input = makeRPCInput("foo");
+
+ InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, input);
+ rpcBroker1.tell(invokeMsg, getRef());
+
+ probeReg1.expectMsgClass(duration("5 seconds"), RpcRegistry.Messages.FindRouters.class);
+ probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(
+ Collections.<Pair<ActorRef, Long>>emptyList()));
+
+ akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
+ akka.actor.Status.Failure.class);
+
+ assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass());
+ }};
+ }
+
+
+ /**
+ * This test method invokes and executes the remote rpc
+ */
+ //@Test
+ public void testInvokeRpc() throws URISyntaxException {
+ new JavaTestKit(node1) {{
+ QName instanceQName = new QName(new URI("ns"), "instance");
+
+ CompositeNode invokeRpcResult = makeRPCOutput("bar");
+ RpcResult<CompositeNode> rpcResult =
+ RpcResultBuilder.<CompositeNode>success(invokeRpcResult).build();
+ ArgumentCaptor<CompositeNode> inputCaptor = new ArgumentCaptor<>();
+ when(brokerSession.rpc(eq(TEST_RPC), inputCaptor.capture()))
+ .thenReturn(Futures.immediateFuture(rpcResult));
+
+ // invoke rpc
+ CompositeNode input = makeRPCInput("foo");
+ YangInstanceIdentifier instanceID = YangInstanceIdentifier.of(instanceQName);
+ InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, instanceID, input);
+ rpcBroker1.tell(invokeMsg, getRef());
+
+ FindRouters findRouters = probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
+ RouteIdentifier<?, ?, ?> routeIdentifier = findRouters.getRouteIdentifier();
+ assertEquals("getType", TEST_RPC, routeIdentifier.getType());
+ assertEquals("getRoute", instanceID, routeIdentifier.getRoute());
+
+ probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(
+ Arrays.asList(new Pair<ActorRef, Long>(rpcBroker2, 200L))));
+
+ RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class);
+ assertCompositeNodeEquals((CompositeNode)invokeRpcResult.getValue().get(0),
+ XmlUtils.xmlToCompositeNode(rpcResponse.getResultCompositeNode()));
+ assertCompositeNodeEquals(input, inputCaptor.getValue());
+ }};
+ }
+
+ @Test
+ public void testInvokeRpcWithNoOutput() {
+ new JavaTestKit(node1) {{
+
+ RpcResult<CompositeNode> rpcResult = RpcResultBuilder.<CompositeNode>success().build();
+ when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class)))
+ .thenReturn(Futures.immediateFuture(rpcResult));
+
+ InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, makeRPCInput("foo"));
+ rpcBroker1.tell(invokeMsg, getRef());
+
+ probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
+ probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(
+ Arrays.asList(new Pair<ActorRef, Long>(rpcBroker2, 200L))));
+
+ RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class);
+
+ assertEquals("getResultCompositeNode", "", rpcResponse.getResultCompositeNode());
+ }};
+ }
+
+ @Test
+ public void testInvokeRpcWithExecuteFailure() {
+ new JavaTestKit(node1) {{
+
+ RpcResult<CompositeNode> rpcResult = RpcResultBuilder.<CompositeNode>failed()
+ .withError(ErrorType.RPC, "tag", "error", "appTag", "info",
+ new Exception("mock"))
+ .build();
+ when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class)))
+ .thenReturn(Futures.immediateFuture(rpcResult));
+
+ InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, makeRPCInput("foo"));
+ rpcBroker1.tell(invokeMsg, getRef());
+
+ probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
+ probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(
+ Arrays.asList(new Pair<ActorRef, Long>(rpcBroker2, 200L))));
+
+ akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
+ akka.actor.Status.Failure.class);
+
+ assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass());
+
+ RpcErrorsException errorsEx = (RpcErrorsException)failure.cause();
+ List<RpcError> rpcErrors = Lists.newArrayList(errorsEx.getRpcErrors());
+ assertEquals("RpcErrors count", 1, rpcErrors.size());
+ assertRpcErrorEquals(rpcErrors.get(0), ErrorSeverity.ERROR, ErrorType.RPC, "tag",
+ "error", "appTag", "info", "mock");
+ }};
+ }
+
+ @Test
+ public void testInvokeRpcWithFindRoutersFailure() {
+ new JavaTestKit(node1) {{
+
+ InvokeRpc invokeMsg = new InvokeRpc(TEST_RPC, null, makeRPCInput("foo"));
+ rpcBroker1.tell(invokeMsg, getRef());
+
+ probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
+ probeReg1.reply(new akka.actor.Status.Failure(new TestException()));
+
+ akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
+ akka.actor.Status.Failure.class);
+
+ assertEquals("failure.cause()", TestException.class, failure.cause().getClass());
+ }};
+ }
+
+ @Test
+ public void testExecuteRpc() {
+ new JavaTestKit(node1) {{
+
+ String xml = "<input xmlns=\"urn:test\"><input-data>foo</input-data></input>";
+
+ CompositeNode invokeRpcResult = makeRPCOutput("bar");
+ RpcResult<CompositeNode> rpcResult =
+ RpcResultBuilder.<CompositeNode>success(invokeRpcResult).build();
+ ArgumentCaptor<CompositeNode> inputCaptor = new ArgumentCaptor<>();
+ when(brokerSession.rpc(eq(TEST_RPC), inputCaptor.capture()))
+ .thenReturn(Futures.immediateFuture(rpcResult));
+
+ ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC);
+
+ rpcBroker1.tell(executeMsg, getRef());
+
+ RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class);
+
+ assertCompositeNodeEquals((CompositeNode)invokeRpcResult.getValue().get(0),
+ XmlUtils.xmlToCompositeNode(rpcResponse.getResultCompositeNode()));
+ }};
+ }
+
+ @Test
+ public void testExecuteRpcFailureWithRpcErrors() {
+ new JavaTestKit(node1) {{
+
+ String xml = "<input xmlns=\"urn:test\"><input-data>foo</input-data></input>";
+
+ RpcResult<CompositeNode> rpcResult = RpcResultBuilder.<CompositeNode>failed()
+ .withError(ErrorType.RPC, "tag1", "error", "appTag1", "info1",
+ new Exception("mock"))
+ .withWarning(ErrorType.PROTOCOL, "tag2", "warning", "appTag2", "info2", null)
+ .build();
+ when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class)))
+ .thenReturn(Futures.immediateFuture(rpcResult));
+
+ ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC);
+
+ rpcBroker1.tell(executeMsg, getRef());
+
+ akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
+ akka.actor.Status.Failure.class);
+
+ assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass());
+
+ RpcErrorsException errorsEx = (RpcErrorsException)failure.cause();
+ List<RpcError> rpcErrors = Lists.newArrayList(errorsEx.getRpcErrors());
+ assertEquals("RpcErrors count", 2, rpcErrors.size());
+ assertRpcErrorEquals(rpcErrors.get(0), ErrorSeverity.ERROR, ErrorType.RPC, "tag1",
+ "error", "appTag1", "info1", "mock");
+ assertRpcErrorEquals(rpcErrors.get(1), ErrorSeverity.WARNING, ErrorType.PROTOCOL, "tag2",
+ "warning", "appTag2", "info2", null);
+ }};
+ }
+
+ @Test
+ public void testExecuteRpcFailureWithNoRpcErrors() {
+ new JavaTestKit(node1) {{
+
+ String xml = "<input xmlns=\"urn:test\"><input-data>foo</input-data></input>";
+
+ RpcResult<CompositeNode> rpcResult = RpcResultBuilder.<CompositeNode>failed().build();
+ when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class)))
+ .thenReturn(Futures.immediateFuture(rpcResult));
+
+ ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC);
+
+ rpcBroker1.tell(executeMsg, getRef());
+
+ akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
+ akka.actor.Status.Failure.class);
+
+ assertEquals("failure.cause()", RpcErrorsException.class, failure.cause().getClass());
+
+ RpcErrorsException errorsEx = (RpcErrorsException)failure.cause();
+ List<RpcError> rpcErrors = Lists.newArrayList(errorsEx.getRpcErrors());
+ assertEquals("RpcErrors count", 1, rpcErrors.size());
+ assertRpcErrorEquals(rpcErrors.get(0), ErrorSeverity.ERROR, ErrorType.RPC,
+ "operation-failed", "failed", null, null, null);
+ }};
+ }
+
+ @Test
+ public void testExecuteRpcFailureWithException() {
+ new JavaTestKit(node1) {{
+
+ String xml = "<input xmlns=\"urn:test\"><input-data>foo</input-data></input>";
+
+ when(brokerSession.rpc(eq(TEST_RPC), any(CompositeNode.class)))
+ .thenReturn(Futures.<RpcResult<CompositeNode>>immediateFailedFuture(
+ new TestException()));
+
+ ExecuteRpc executeMsg = new ExecuteRpc(xml, TEST_RPC);
+
+ rpcBroker1.tell(executeMsg, getRef());
-public class RpcBrokerTest {
-
- static ActorSystem system;
-
-
- @BeforeClass
- public static void setup() {
- system = ActorSystem.create();
- }
-
- @AfterClass
- public static void teardown() {
- JavaTestKit.shutdownActorSystem(system);
- system = null;
- }
-
- @Test
- public void testInvokeRpcError() throws URISyntaxException {
- new JavaTestKit(system) {{
- ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(Mockito.mock(ClusterWrapper.class)));
- Broker.ProviderSession brokerSession = Mockito.mock(Broker.ProviderSession.class);
- SchemaContext schemaContext = mock(SchemaContext.class);
- ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext));
- QName rpc = new QName(new URI("actor1"), "actor1");
- InvokeRpc invokeMsg = new InvokeRpc(rpc, null);
- rpcBroker.tell(invokeMsg, getRef());
-
- Boolean getMsg = new ExpectMsg<Boolean>("ErrorResponse") {
- protected Boolean match(Object in) {
- if (in instanceof ErrorResponse) {
- ErrorResponse reply = (ErrorResponse)in;
- return "No remote actor found for rpc execution.".equals(reply.getException().getMessage());
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- Assert.assertTrue(getMsg);
- }};
- }
-
- /**
- * This test method invokes and executes the remote rpc
- */
-
- @Test
- public void testInvokeRpc() throws URISyntaxException {
- new JavaTestKit(system) {{
- ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(mock(ClusterWrapper.class)));
- Broker.ProviderSession brokerSession = mock(Broker.ProviderSession.class);
- SchemaContext schemaContext = mock(SchemaContext.class);
- ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext));
- ActorRef rpcBrokerRemote = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext), "actor1");
- // Add RPC in table
- QName rpc = new QName(new URI("actor1"), "actor1");
- RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, rpc, null);
- final String route = rpcBrokerRemote.path().toString();
- AddRpc rpcMsg = new AddRpc(routeId, route);
- rpcRegistry.tell(rpcMsg, getRef());
- expectMsgEquals(duration("2 second"), "Success");
-
- // invoke rpc
- CompositeNode input = new ImmutableCompositeNode(QName.create("ns", "2013-12-09", "child1"), new ArrayList<Node<?>>(), ModifyAction.REPLACE);
- CompositeNode invokeRpcResult = mock(CompositeNode.class);
- Collection<RpcError> errors = new ArrayList<>();
- RpcResult<CompositeNode> result = Rpcs.getRpcResult(true, invokeRpcResult, errors);
- Future<RpcResult<CompositeNode>> rpcResult = Futures.immediateFuture(result);
- when(brokerSession.rpc(rpc, input)).thenReturn(rpcResult);
- InvokeRpc invokeMsg = new InvokeRpc(rpc, input);
- rpcBroker.tell(invokeMsg, getRef());
-
- //verify response msg
- Boolean getMsg = new ExpectMsg<Boolean>("RpcResponse") {
- protected Boolean match(Object in) {
- if (in instanceof RpcResponse) {
- return true;
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- Assert.assertTrue(getMsg);
- }};
- }
-
- @Test
- public void testInvokeRoutedRpcError() throws URISyntaxException {
- new JavaTestKit(system) {{
- ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(Mockito.mock(ClusterWrapper.class)));
- Broker.ProviderSession brokerSession = Mockito.mock(Broker.ProviderSession.class);
- SchemaContext schemaContext = mock(SchemaContext.class);
- ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext));
- QName rpc = new QName(new URI("actor1"), "actor1");
- InvokeRoutedRpc invokeMsg = new InvokeRoutedRpc(rpc, null);
- rpcBroker.tell(invokeMsg, getRef());
-
- Boolean getMsg = new ExpectMsg<Boolean>("ErrorResponse") {
- protected Boolean match(Object in) {
- if (in instanceof ErrorResponse) {
- ErrorResponse reply = (ErrorResponse)in;
- return "No remote actor found for rpc execution.".equals(reply.getException().getMessage());
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- Assert.assertTrue(getMsg);
- }};
- }
-
- /**
- * This test method invokes and executes the remote routed rpc
- */
-
- @Test
- public void testInvokeRoutedRpc() throws URISyntaxException {
- new JavaTestKit(system) {{
- ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(mock(ClusterWrapper.class)));
- Broker.ProviderSession brokerSession = mock(Broker.ProviderSession.class);
- SchemaContext schemaContext = mock(SchemaContext.class);
- ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext));
- ActorRef rpcBrokerRemote = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext), "actor2");
- // Add Routed RPC in table
- QName rpc = new QName(new URI("actor2"), "actor2");
- RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, rpc, null);
- final String route = rpcBrokerRemote.path().toString();
- Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new HashSet<>();
- routeIds.add(routeId);
-
- AddRoutedRpc rpcMsg = new AddRoutedRpc(routeIds, route);
- rpcRegistry.tell(rpcMsg, getRef());
- expectMsgEquals(duration("2 second"), "Success");
-
- // invoke rpc
- CompositeNode input = new ImmutableCompositeNode(QName.create("ns", "2013-12-09", "child1"), new ArrayList<Node<?>>(), ModifyAction.REPLACE);
- CompositeNode invokeRpcResult = mock(CompositeNode.class);
- Collection<RpcError> errors = new ArrayList<>();
- RpcResult<CompositeNode> result = Rpcs.getRpcResult(true, invokeRpcResult, errors);
- Future<RpcResult<CompositeNode>> rpcResult = Futures.immediateFuture(result);
- when(brokerSession.rpc(rpc, input)).thenReturn(rpcResult);
- InvokeRoutedRpc invokeMsg = new InvokeRoutedRpc(rpc, input);
- rpcBroker.tell(invokeMsg, getRef());
-
- //verify response msg
- Boolean getMsg = new ExpectMsg<Boolean>("RpcResponse") {
- protected Boolean match(Object in) {
- if (in instanceof RpcResponse) {
- return true;
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- Assert.assertTrue(getMsg);
- }};
- }
+ akka.actor.Status.Failure failure = expectMsgClass(duration("5 seconds"),
+ akka.actor.Status.Failure.class);
+ assertEquals("failure.cause()", TestException.class, failure.cause().getClass());
+ }};
+ }
}