import akka.actor.ActorRef;
import akka.actor.ActorSystem;
+import akka.japi.Pair;
import akka.testkit.JavaTestKit;
import com.google.common.util.concurrent.Futures;
+import com.typesafe.config.ConfigFactory;
import junit.framework.Assert;
import org.junit.AfterClass;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
-import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc;
-import org.opendaylight.controller.remote.rpc.messages.AddRpc;
import org.opendaylight.controller.remote.rpc.messages.ErrorResponse;
-import org.opendaylight.controller.remote.rpc.messages.InvokeRoutedRpc;
import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
-import org.opendaylight.controller.remote.rpc.registry.ClusterWrapper;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
import org.opendaylight.controller.sal.common.util.Rpcs;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.controller.sal.core.api.Broker;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.common.RpcError;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.List;
import java.util.concurrent.Future;
import static org.mockito.Mockito.mock;
public class RpcBrokerTest {
- static ActorSystem system;
+ static ActorSystem node1;
+ static ActorSystem node2;
+ private ActorRef rpcBroker1;
+ private JavaTestKit probeReg1;
+ private ActorRef rpcBroker2;
+ private JavaTestKit probeReg2;
+ private Broker.ProviderSession brokerSession;
@BeforeClass
- public static void setup() {
- system = ActorSystem.create();
+ public static void setup() throws InterruptedException {
+ node1 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberA"));
+ node2 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberB"));
}
@AfterClass
public static void teardown() {
- JavaTestKit.shutdownActorSystem(system);
- system = null;
+ JavaTestKit.shutdownActorSystem(node1);
+ JavaTestKit.shutdownActorSystem(node2);
+ node1 = null;
+ node2 = null;
}
+ @Before
+ public void createActor() {
+ brokerSession = Mockito.mock(Broker.ProviderSession.class);
+ SchemaContext schemaContext = mock(SchemaContext.class);
+ probeReg1 = new JavaTestKit(node1);
+ rpcBroker1 = node1.actorOf(RpcBroker.props(brokerSession, probeReg1.getRef(), schemaContext));
+ probeReg2 = new JavaTestKit(node2);
+ rpcBroker2 = node2.actorOf(RpcBroker.props(brokerSession, probeReg2.getRef(), schemaContext));
+
+ }
@Test
- public void testInvokeRpcError() throws URISyntaxException {
- new JavaTestKit(system) {{
- ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(Mockito.mock(ClusterWrapper.class)));
- Broker.ProviderSession brokerSession = Mockito.mock(Broker.ProviderSession.class);
- SchemaContext schemaContext = mock(SchemaContext.class);
- ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext));
- QName rpc = new QName(new URI("actor1"), "actor1");
- InvokeRpc invokeMsg = new InvokeRpc(rpc, null);
- rpcBroker.tell(invokeMsg, getRef());
+ public void testInvokeRpcError() throws Exception {
+ new JavaTestKit(node1) {{
+ QName rpc = new QName(new URI("noactor1"), "noactor1");
+ CompositeNode input = new ImmutableCompositeNode(QName.create("ns", "2013-12-09", "no child"), new ArrayList<Node<?>>(), ModifyAction.REPLACE);
+
+
+ InvokeRpc invokeMsg = new InvokeRpc(rpc, null, input);
+ rpcBroker1.tell(invokeMsg, getRef());
+ probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
+ probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(new ArrayList<Pair<ActorRef, Long>>()));
Boolean getMsg = new ExpectMsg<Boolean>("ErrorResponse") {
protected Boolean match(Object in) {
if (in instanceof ErrorResponse) {
ErrorResponse reply = (ErrorResponse)in;
- return "No remote actor found for rpc execution.".equals(reply.getException().getMessage());
+ return reply.getException().getMessage().contains("No remote actor found for rpc execution of :");
} else {
throw noMatch();
}
}.get(); // this extracts the received message
Assert.assertTrue(getMsg);
+
}};
}
+
/**
* This test method invokes and executes the remote rpc
*/
@Test
public void testInvokeRpc() throws URISyntaxException {
- new JavaTestKit(system) {{
- ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(mock(ClusterWrapper.class)));
- Broker.ProviderSession brokerSession = mock(Broker.ProviderSession.class);
- SchemaContext schemaContext = mock(SchemaContext.class);
- ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext));
- ActorRef rpcBrokerRemote = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext), "actor1");
- // Add RPC in table
- QName rpc = new QName(new URI("actor1"), "actor1");
- RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, rpc, null);
- final String route = rpcBrokerRemote.path().toString();
- AddRpc rpcMsg = new AddRpc(routeId, route);
- rpcRegistry.tell(rpcMsg, getRef());
- expectMsgEquals(duration("2 second"), "Success");
-
+ new JavaTestKit(node1) {{
+ QName rpc = new QName(new URI("noactor1"), "noactor1");
// invoke rpc
CompositeNode input = new ImmutableCompositeNode(QName.create("ns", "2013-12-09", "child1"), new ArrayList<Node<?>>(), ModifyAction.REPLACE);
- CompositeNode invokeRpcResult = mock(CompositeNode.class);
- Collection<RpcError> errors = new ArrayList<>();
- RpcResult<CompositeNode> result = Rpcs.getRpcResult(true, invokeRpcResult, errors);
- Future<RpcResult<CompositeNode>> rpcResult = Futures.immediateFuture(result);
- when(brokerSession.rpc(rpc, input)).thenReturn(rpcResult);
- InvokeRpc invokeMsg = new InvokeRpc(rpc, input);
- rpcBroker.tell(invokeMsg, getRef());
-
- //verify response msg
- Boolean getMsg = new ExpectMsg<Boolean>("RpcResponse") {
- protected Boolean match(Object in) {
- if (in instanceof RpcResponse) {
- return true;
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- Assert.assertTrue(getMsg);
- }};
- }
-
- @Test
- public void testInvokeRoutedRpcError() throws URISyntaxException {
- new JavaTestKit(system) {{
- ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(Mockito.mock(ClusterWrapper.class)));
- Broker.ProviderSession brokerSession = Mockito.mock(Broker.ProviderSession.class);
- SchemaContext schemaContext = mock(SchemaContext.class);
- ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext));
- QName rpc = new QName(new URI("actor1"), "actor1");
- InvokeRoutedRpc invokeMsg = new InvokeRoutedRpc(rpc, null);
- rpcBroker.tell(invokeMsg, getRef());
-
- Boolean getMsg = new ExpectMsg<Boolean>("ErrorResponse") {
- protected Boolean match(Object in) {
- if (in instanceof ErrorResponse) {
- ErrorResponse reply = (ErrorResponse)in;
- return "No remote actor found for rpc execution.".equals(reply.getException().getMessage());
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
+ InvokeRpc invokeMsg = new InvokeRpc(rpc, null, input);
+ rpcBroker1.tell(invokeMsg, getRef());
- Assert.assertTrue(getMsg);
- }};
- }
+ probeReg1.expectMsgClass(RpcRegistry.Messages.FindRouters.class);
+ List<Pair<ActorRef, Long>> routerList = new ArrayList<Pair<ActorRef, Long>>();
- /**
- * This test method invokes and executes the remote routed rpc
- */
+ routerList.add(new Pair<ActorRef, Long>(rpcBroker2, 200L));
- @Test
- public void testInvokeRoutedRpc() throws URISyntaxException {
- new JavaTestKit(system) {{
- ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(mock(ClusterWrapper.class)));
- Broker.ProviderSession brokerSession = mock(Broker.ProviderSession.class);
- SchemaContext schemaContext = mock(SchemaContext.class);
- ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext));
- ActorRef rpcBrokerRemote = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext), "actor2");
- // Add Routed RPC in table
- QName rpc = new QName(new URI("actor2"), "actor2");
- RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, rpc, null);
- final String route = rpcBrokerRemote.path().toString();
- Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new HashSet<>();
- routeIds.add(routeId);
-
- AddRoutedRpc rpcMsg = new AddRoutedRpc(routeIds, route);
- rpcRegistry.tell(rpcMsg, getRef());
- expectMsgEquals(duration("2 second"), "Success");
+ probeReg1.reply(new RpcRegistry.Messages.FindRoutersReply(routerList));
- // invoke rpc
- CompositeNode input = new ImmutableCompositeNode(QName.create("ns", "2013-12-09", "child1"), new ArrayList<Node<?>>(), ModifyAction.REPLACE);
CompositeNode invokeRpcResult = mock(CompositeNode.class);
Collection<RpcError> errors = new ArrayList<>();
RpcResult<CompositeNode> result = Rpcs.getRpcResult(true, invokeRpcResult, errors);
Future<RpcResult<CompositeNode>> rpcResult = Futures.immediateFuture(result);
when(brokerSession.rpc(rpc, input)).thenReturn(rpcResult);
- InvokeRoutedRpc invokeMsg = new InvokeRoutedRpc(rpc, input);
- rpcBroker.tell(invokeMsg, getRef());
//verify response msg
Boolean getMsg = new ExpectMsg<Boolean>("RpcResponse") {
Assert.assertTrue(getMsg);
}};
}
-
}