X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2FRpcRegistryTest.java;h=e6793741a3ec2ef9030a2e469058ab92de93c153;hb=d6f1e7790157461553b26ec82d246e68b62aad6b;hp=d011d331a684e4f0bcbbbf395e18145364dfdf4a;hpb=244d226cc66672e2e15d0b557bd1af37153d8065;p=controller.git diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java index d011d331a6..e6793741a3 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java @@ -1,159 +1,286 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - package org.opendaylight.controller.remote.rpc.registry; + +import akka.actor.ActorPath; import akka.actor.ActorRef; +import akka.actor.ActorSelection; import akka.actor.ActorSystem; +import akka.actor.ChildActorPath; +import akka.actor.Props; import akka.testkit.JavaTestKit; -import junit.framework.Assert; +import com.google.common.base.Predicate; +import com.typesafe.config.ConfigFactory; + +import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import org.mockito.Mockito; import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl; -import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.AddRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply; -import org.opendaylight.controller.remote.rpc.messages.GetRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRpcReply; -import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.RemoveRpc; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages; import org.opendaylight.controller.sal.connector.api.RpcRouter; +import org.opendaylight.controller.utils.ConditionalProbe; import org.opendaylight.yangtools.yang.common.QName; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; +import javax.annotation.Nullable; import java.net.URI; import java.net.URISyntaxException; -import java.util.HashSet; -import java.util.Set; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter; +import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; +import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; public class RpcRegistryTest { - static ActorSystem system; + private static ActorSystem node1; + private static ActorSystem node2; + private static ActorSystem node3; + private ActorRef registry1; + private ActorRef registry2; + private ActorRef registry3; @BeforeClass - public static void setup() { - system = ActorSystem.create(); + public static void setup() throws InterruptedException { + node1 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberA")); + node2 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberB")); + node3 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberC")); } @AfterClass public static void teardown() { - JavaTestKit.shutdownActorSystem(system); - system = null; + JavaTestKit.shutdownActorSystem(node1); + JavaTestKit.shutdownActorSystem(node2); + JavaTestKit.shutdownActorSystem(node3); + if (node1 != null) + node1.shutdown(); + if (node2 != null) + node2.shutdown(); + if (node3 != null) + node3.shutdown(); + + } + + @Before + public void createRpcRegistry() throws InterruptedException { + registry1 = node1.actorOf(Props.create(RpcRegistry.class)); + registry2 = node2.actorOf(Props.create(RpcRegistry.class)); + registry3 = node3.actorOf(Props.create(RpcRegistry.class)); + } + + @After + public void stopRpcRegistry() throws InterruptedException { + if (registry1 != null) + node1.stop(registry1); + if (registry2 != null) + node2.stop(registry2); + if (registry3 != null) + node3.stop(registry3); } /** - This test add, read and remove an entry in global rpc + * One node cluster. + * 1. Register rpc, ensure router can be found + * 2. Then remove rpc, ensure its deleted + * + * @throws URISyntaxException + * @throws InterruptedException */ @Test - public void testGlobalRpc() throws URISyntaxException { - new JavaTestKit(system) {{ - ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(Mockito.mock(ClusterWrapper.class))); - QName type = new QName(new URI("actor1"), "actor1"); - RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null); - final String route = "actor1"; - - AddRpc rpcMsg = new AddRpc(routeId, route); - rpcRegistry.tell(rpcMsg, getRef()); - expectMsgEquals(duration("2 second"), "Success"); - - GetRpc getRpc = new GetRpc(routeId); - rpcRegistry.tell(getRpc, getRef()); - - Boolean getMsg = new ExpectMsg("GetRpcReply") { - protected Boolean match(Object in) { - if (in instanceof GetRpcReply) { - GetRpcReply reply = (GetRpcReply)in; - return route.equals(reply.getRoutePath()); - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message + public void testAddRemoveRpcOnSameNode() throws URISyntaxException, InterruptedException { + validateSystemStartup(); - Assert.assertTrue(getMsg); + final JavaTestKit mockBroker = new JavaTestKit(node1); - RemoveRpc removeMsg = new RemoveRpc(routeId); - rpcRegistry.tell(removeMsg, getRef()); - expectMsgEquals(duration("2 second"), "Success"); + final ActorPath bucketStorePath = new ChildActorPath(registry1.path(), "store"); - rpcRegistry.tell(getRpc, getRef()); + //install probe + final JavaTestKit probe1 = createProbeForMessage( + node1, bucketStorePath, Messages.BucketStoreMessages.UpdateBucket.class); + + //Add rpc on node 1 + registry1.tell(new SetLocalRouter(mockBroker.getRef()), mockBroker.getRef()); + registry1.tell(getAddRouteMessage(), mockBroker.getRef()); + + //Bucket store should get an update bucket message. Updated bucket contains added rpc. + probe1.expectMsgClass( + FiniteDuration.apply(10, TimeUnit.SECONDS), + Messages.BucketStoreMessages.UpdateBucket.class); + + //Now remove rpc + registry1.tell(getRemoveRouteMessage(), mockBroker.getRef()); + + //Bucket store should get an update bucket message. Rpc is removed in the updated bucket + probe1.expectMsgClass( + FiniteDuration.apply(10, TimeUnit.SECONDS), + Messages.BucketStoreMessages.UpdateBucket.class); - Boolean getNullMsg = new ExpectMsg("GetRpcReply") { - protected Boolean match(Object in) { - if (in instanceof GetRpcReply) { - GetRpcReply reply = (GetRpcReply)in; - return reply.getRoutePath() == null; - } else { - throw noMatch(); - } - } - }.get(); - Assert.assertTrue(getNullMsg); - }}; } + /** - This test add, read and remove an entry in routed rpc + * Three node cluster. + * 1. Register rpc on 1 node, ensure 2nd node gets updated + * 2. Remove rpc on 1 node, ensure 2nd node gets updated + * + * @throws URISyntaxException + * @throws InterruptedException */ @Test - public void testRoutedRpc() throws URISyntaxException { - new JavaTestKit(system) {{ - ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(Mockito.mock(ClusterWrapper.class))); - QName type = new QName(new URI("actor1"), "actor1"); - RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null); - final String route = "actor1"; - - Set> routeIds = new HashSet<>(); - routeIds.add(routeId); - - AddRoutedRpc rpcMsg = new AddRoutedRpc(routeIds, route); - rpcRegistry.tell(rpcMsg, getRef()); - expectMsgEquals(duration("2 second"), "Success"); - - GetRoutedRpc getRpc = new GetRoutedRpc(routeId); - rpcRegistry.tell(getRpc, getRef()); - - Boolean getMsg = new ExpectMsg("GetRoutedRpcReply") { - protected Boolean match(Object in) { - if (in instanceof GetRoutedRpcReply) { - GetRoutedRpcReply reply = (GetRoutedRpcReply)in; - return route.equals(reply.getRoutePath()); - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message + public void testRpcAddRemoveInCluster() throws URISyntaxException, InterruptedException { + + validateSystemStartup(); + + final JavaTestKit mockBroker1 = new JavaTestKit(node1); + + //install probe on node2's bucket store + final ActorPath bucketStorePath = new ChildActorPath(registry2.path(), "store"); + final JavaTestKit probe2 = createProbeForMessage( + node2, bucketStorePath, Messages.BucketStoreMessages.UpdateRemoteBuckets.class); + + + //Add rpc on node 1 + registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef()); + registry1.tell(getAddRouteMessage(), mockBroker1.getRef()); + + //Bucket store on node2 should get a message to update its local copy of remote buckets + probe2.expectMsgClass( + FiniteDuration.apply(10, TimeUnit.SECONDS), + Messages.BucketStoreMessages.UpdateRemoteBuckets.class); + + //Now remove + registry1.tell(getRemoveRouteMessage(), mockBroker1.getRef()); + + //Bucket store on node2 should get a message to update its local copy of remote buckets + probe2.expectMsgClass( + FiniteDuration.apply(10, TimeUnit.SECONDS), + Messages.BucketStoreMessages.UpdateRemoteBuckets.class); + + } + + /** + * Three node cluster. + * Register rpc on 2 nodes. Ensure 3rd gets updated. + * + * @throws Exception + */ + @Test + public void testRpcAddedOnMultiNodes() throws Exception { + + validateSystemStartup(); + + final JavaTestKit mockBroker1 = new JavaTestKit(node1); + final JavaTestKit mockBroker2 = new JavaTestKit(node2); + final JavaTestKit mockBroker3 = new JavaTestKit(node3); + + registry3.tell(new SetLocalRouter(mockBroker3.getRef()), mockBroker3.getRef()); + + //install probe on node 3 + final ActorPath bucketStorePath = new ChildActorPath(registry3.path(), "store"); + final JavaTestKit probe3 = createProbeForMessage( + node3, bucketStorePath, Messages.BucketStoreMessages.UpdateRemoteBuckets.class); - Assert.assertTrue(getMsg); - RemoveRoutedRpc removeMsg = new RemoveRoutedRpc(routeIds, route); - rpcRegistry.tell(removeMsg, getRef()); - expectMsgEquals(duration("2 second"), "Success"); + //Add rpc on node 1 + registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef()); + registry1.tell(getAddRouteMessage(), mockBroker1.getRef()); - rpcRegistry.tell(getRpc, getRef()); + probe3.expectMsgClass( + FiniteDuration.apply(10, TimeUnit.SECONDS), + Messages.BucketStoreMessages.UpdateRemoteBuckets.class); + + + //Add same rpc on node 2 + registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef()); + registry2.tell(getAddRouteMessage(), mockBroker2.getRef()); + + probe3.expectMsgClass( + FiniteDuration.apply(10, TimeUnit.SECONDS), + Messages.BucketStoreMessages.UpdateRemoteBuckets.class); + } - Boolean getNullMsg = new ExpectMsg("GetRoutedRpcReply") { - protected Boolean match(Object in) { - if (in instanceof GetRoutedRpcReply) { - GetRoutedRpcReply reply = (GetRoutedRpcReply)in; - return reply.getRoutePath() == null; - } else { - throw noMatch(); + private JavaTestKit createProbeForMessage(ActorSystem node, ActorPath subjectPath, final Class clazz) { + final JavaTestKit probe = new JavaTestKit(node); + + ConditionalProbe conditionalProbe = + new ConditionalProbe(probe.getRef(), new Predicate() { + @Override + public boolean apply(@Nullable Object input) { + return clazz.equals(input.getClass()); } + }); + + ActorSelection subject = node.actorSelection(subjectPath); + subject.tell(conditionalProbe, ActorRef.noSender()); + + return probe; + + } + + private void validateSystemStartup() throws InterruptedException { + + ActorPath gossiper1Path = new ChildActorPath(new ChildActorPath(registry1.path(), "store"), "gossiper"); + ActorPath gossiper2Path = new ChildActorPath(new ChildActorPath(registry2.path(), "store"), "gossiper"); + ActorPath gossiper3Path = new ChildActorPath(new ChildActorPath(registry3.path(), "store"), "gossiper"); + + ActorSelection gossiper1 = node1.actorSelection(gossiper1Path); + ActorSelection gossiper2 = node2.actorSelection(gossiper2Path); + ActorSelection gossiper3 = node3.actorSelection(gossiper3Path); + + + if (!resolveReference(gossiper1, gossiper2, gossiper3)) + Assert.fail("Could not find gossipers"); + } + + private Boolean resolveReference(ActorSelection... gossipers) { + + Boolean resolved = true; + for (int i = 0; i < 5; i++) { + + resolved = true; + System.out.println(System.currentTimeMillis() + " Resolving gossipers; trial #" + i); + + for (ActorSelection gossiper : gossipers) { + ActorRef ref = null; + + try { + Future future = gossiper.resolveOne(new FiniteDuration(15000, TimeUnit.MILLISECONDS)); + ref = Await.result(future, new FiniteDuration(10000, TimeUnit.MILLISECONDS)); + } catch (Exception e) { + System.out.println("Could not find gossiper in attempt#" + i + ". Got exception " + e.getMessage()); } - }.get(); - Assert.assertTrue(getNullMsg); - }}; + if (ref == null) + resolved = false; + } + + if (resolved) break; + + } + return resolved; + } + + private AddOrUpdateRoutes getAddRouteMessage() throws URISyntaxException { + return new AddOrUpdateRoutes(createRouteIds()); + } + + private RemoveRoutes getRemoveRouteMessage() throws URISyntaxException { + return new RemoveRoutes(createRouteIds()); + } + + private List> createRouteIds() throws URISyntaxException { + QName type = new QName(new URI("/mockrpc"), "mockrpc"); + List> routeIds = new ArrayList<>(); + routeIds.add(new RouteIdentifierImpl(null, type, null)); + return routeIds; } }