X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2FRpcRegistryTest.java;h=84062735c22ddf5a3cbd6a40e74e64466d745e5d;hp=5b7b7e4fdc61dfaaa2377078853b691b48b07394;hb=5b66dd8f5e3467a07e77b20fe696b29993ce5565;hpb=d80bf0f81bdeed907b290b67f26f1a3541ad3ea4 diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java index 5b7b7e4fdc..84062735c2 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java @@ -1,19 +1,40 @@ +/* + * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + package org.opendaylight.controller.remote.rpc.registry; +import static org.junit.Assert.fail; +import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_ALL_BUCKETS; +import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_BUCKET_VERSIONS; + import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Address; -import akka.actor.Props; -import akka.japi.Pair; +import akka.cluster.Cluster; +import akka.cluster.ClusterEvent.CurrentClusterState; +import akka.cluster.Member; +import akka.cluster.MemberStatus; +import akka.cluster.UniqueAddress; import akka.testkit.JavaTestKit; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; +import com.typesafe.config.ConfigFactory; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.Set; import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.AfterClass; @@ -21,30 +42,34 @@ import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier; import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; -import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; -import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters; -import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRoutersReply; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; -import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.RemoteRpcEndpoint; import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; -import org.opendaylight.controller.sal.connector.api.RpcRouter; -import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier; import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; public class RpcRegistryTest { + private static final Logger LOG = LoggerFactory.getLogger(RpcRegistryTest.class); private static ActorSystem node1; private static ActorSystem node2; private static ActorSystem node3; + private JavaTestKit invoker1; + private JavaTestKit invoker2; + private JavaTestKit invoker3; + private JavaTestKit registrar1; + private JavaTestKit registrar2; + private JavaTestKit registrar3; private ActorRef registry1; private ActorRef registry2; private ActorRef registry3; @@ -53,26 +78,62 @@ public class RpcRegistryTest { @BeforeClass public static void staticSetup() throws InterruptedException { - RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build(); - RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").build(); - RemoteRpcProviderConfig config3 = new RemoteRpcProviderConfig.Builder("memberC").build(); - node1 = ActorSystem.create("opendaylight-rpc", config1.get()); - node2 = ActorSystem.create("opendaylight-rpc", config2.get()); - node3 = ActorSystem.create("opendaylight-rpc", config3.get()); + AkkaConfigurationReader reader = ConfigFactory::load; + + RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").gossipTickInterval("200ms") + .withConfigReader(reader).build(); + RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").gossipTickInterval("200ms") + .withConfigReader(reader).build(); + RemoteRpcProviderConfig config3 = new RemoteRpcProviderConfig.Builder("memberC").gossipTickInterval("200ms") + .withConfigReader(reader).build(); + node1 = ActorSystem.create("opendaylight-rpc", config1.get()); + node2 = ActorSystem.create("opendaylight-rpc", config2.get()); + node3 = ActorSystem.create("opendaylight-rpc", config3.get()); + + waitForMembersUp(node1, Cluster.get(node2).selfUniqueAddress(), Cluster.get(node3).selfUniqueAddress()); + waitForMembersUp(node2, Cluster.get(node1).selfUniqueAddress(), Cluster.get(node3).selfUniqueAddress()); + } + + static void waitForMembersUp(final ActorSystem node, final UniqueAddress... addresses) { + Set otherMembersSet = Sets.newHashSet(addresses); + Stopwatch sw = Stopwatch.createStarted(); + while (sw.elapsed(TimeUnit.SECONDS) <= 10) { + CurrentClusterState state = Cluster.get(node).state(); + for (Member m : state.getMembers()) { + if (m.status() == MemberStatus.up() && otherMembersSet.remove(m.uniqueAddress()) + && otherMembersSet.isEmpty()) { + return; + } + } + + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + } + + fail("Member(s) " + otherMembersSet + " are not Up"); } @AfterClass public static void staticTeardown() { - JavaTestKit.shutdownActorSystem(node1); - JavaTestKit.shutdownActorSystem(node2); - JavaTestKit.shutdownActorSystem(node3); + JavaTestKit.shutdownActorSystem(node1); + JavaTestKit.shutdownActorSystem(node2); + JavaTestKit.shutdownActorSystem(node3); } @Before public void setup() { - registry1 = node1.actorOf(Props.create(RpcRegistry.class)); - registry2 = node2.actorOf(Props.create(RpcRegistry.class)); - registry3 = node3.actorOf(Props.create(RpcRegistry.class)); + invoker1 = new JavaTestKit(node1); + registrar1 = new JavaTestKit(node1); + registry1 = node1.actorOf(RpcRegistry.props(config(node1), invoker1.getRef(), registrar1.getRef())); + invoker2 = new JavaTestKit(node2); + registrar2 = new JavaTestKit(node2); + registry2 = node2.actorOf(RpcRegistry.props(config(node2), invoker2.getRef(), registrar2.getRef())); + invoker3 = new JavaTestKit(node3); + registrar3 = new JavaTestKit(node3); + registry3 = node3.actorOf(RpcRegistry.props(config(node3), invoker3.getRef(), registrar3.getRef())); + } + + private static RemoteRpcProviderConfig config(final ActorSystem node) { + return new RemoteRpcProviderConfig(node.settings().config()); } @After @@ -86,102 +147,110 @@ public class RpcRegistryTest { if (registry3 != null) { node3.stop(registry3); } + + if (invoker1 != null) { + node1.stop(invoker1.getRef()); + } + if (invoker2 != null) { + node2.stop(invoker2.getRef()); + } + if (invoker3 != null) { + node3.stop(invoker3.getRef()); + } + + if (registrar1 != null) { + node1.stop(registrar1.getRef()); + } + if (registrar2 != null) { + node2.stop(registrar2.getRef()); + } + if (registrar3 != null) { + node3.stop(registrar3.getRef()); + } } /** * One node cluster. 1. Register rpc, ensure router can be found 2. Then remove rpc, ensure its * deleted - * - * @throws URISyntaxException - * @throws InterruptedException */ @Test public void testAddRemoveRpcOnSameNode() throws Exception { - - System.out.println("testAddRemoveRpcOnSameNode starting"); - - final JavaTestKit mockBroker = new JavaTestKit(node1); + LOG.info("testAddRemoveRpcOnSameNode starting"); Address nodeAddress = node1.provider().getDefaultAddress(); // Add rpc on node 1 - registry1.tell(new SetLocalRouter(mockBroker.getRef()), mockBroker.getRef()); - List> addedRouteIds = createRouteIds(); + List addedRouteIds = createRouteIds(); - registry1.tell(new AddOrUpdateRoutes(addedRouteIds), mockBroker.getRef()); + registry1.tell(new AddOrUpdateRoutes(addedRouteIds), ActorRef.noSender()); // Bucket store should get an update bucket message. Updated bucket contains added rpc. + final JavaTestKit testKit = new JavaTestKit(node1); - Map buckets = retrieveBuckets(registry1, mockBroker, nodeAddress); + Map> buckets = retrieveBuckets(registry1, testKit, nodeAddress); verifyBucket(buckets.get(nodeAddress), addedRouteIds); - Map versions = retrieveVersions(registry1, mockBroker); - Assert.assertEquals("Version for bucket " + nodeAddress, buckets.get(nodeAddress).getVersion(), + Map versions = retrieveVersions(registry1, testKit); + Assert.assertEquals("Version for bucket " + nodeAddress, (Long) buckets.get(nodeAddress).getVersion(), versions.get(nodeAddress)); // Now remove rpc - registry1.tell(new RemoveRoutes(addedRouteIds), mockBroker.getRef()); + registry1.tell(new RemoveRoutes(addedRouteIds), ActorRef.noSender()); // Bucket store should get an update bucket message. Rpc is removed in the updated bucket - verifyEmptyBucket(mockBroker, registry1, nodeAddress); + verifyEmptyBucket(testKit, registry1, nodeAddress); - System.out.println("testAddRemoveRpcOnSameNode ending"); + LOG.info("testAddRemoveRpcOnSameNode ending"); } /** * Three node cluster. 1. Register rpc on 1 node, ensure 2nd node gets updated 2. Remove rpc on * 1 node, ensure 2nd node gets updated - * - * @throws URISyntaxException - * @throws InterruptedException */ @Test public void testRpcAddRemoveInCluster() throws Exception { - System.out.println("testRpcAddRemoveInCluster starting"); + LOG.info("testRpcAddRemoveInCluster starting"); - final JavaTestKit mockBroker1 = new JavaTestKit(node1); - final JavaTestKit mockBroker2 = new JavaTestKit(node2); - - List> addedRouteIds = createRouteIds(); + List addedRouteIds = createRouteIds(); Address node1Address = node1.provider().getDefaultAddress(); // Add rpc on node 1 - registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef()); - registry1.tell(new AddOrUpdateRoutes(addedRouteIds), mockBroker1.getRef()); + registry1.tell(new AddOrUpdateRoutes(addedRouteIds), ActorRef.noSender()); // Bucket store on node2 should get a message to update its local copy of remote buckets + final JavaTestKit testKit = new JavaTestKit(node2); - Map buckets = retrieveBuckets(registry2, mockBroker2, node1Address); + Map> buckets = retrieveBuckets(registry2, testKit, node1Address); verifyBucket(buckets.get(node1Address), addedRouteIds); // Now remove - registry1.tell(new RemoveRoutes(addedRouteIds), mockBroker1.getRef()); + registry1.tell(new RemoveRoutes(addedRouteIds), ActorRef.noSender()); // Bucket store on node2 should get a message to update its local copy of remote buckets. // Wait for the bucket for node1 to be empty. - verifyEmptyBucket(mockBroker2, registry2, node1Address); + verifyEmptyBucket(testKit, registry2, node1Address); - System.out.println("testRpcAddRemoveInCluster ending"); + LOG.info("testRpcAddRemoveInCluster ending"); } - private void verifyEmptyBucket(JavaTestKit testKit, ActorRef registry, Address address) + private void verifyEmptyBucket(final JavaTestKit testKit, final ActorRef registry, final Address address) throws AssertionError { - Map buckets; - int nTries = 0; - while(true) { + Map> buckets; + int numTries = 0; + while (true) { buckets = retrieveBuckets(registry1, testKit, address); try { - verifyBucket(buckets.get(address), Collections.>emptyList()); + verifyBucket(buckets.get(address), Collections.emptyList()); break; } catch (AssertionError e) { - if(++nTries >= 50) { + if (++numTries >= 50) { throw e; } } @@ -192,68 +261,74 @@ public class RpcRegistryTest { /** * Three node cluster. Register rpc on 2 nodes. Ensure 3rd gets updated. - * - * @throws Exception */ @Test public void testRpcAddedOnMultiNodes() throws Exception { - - final JavaTestKit mockBroker1 = new JavaTestKit(node1); - final JavaTestKit mockBroker2 = new JavaTestKit(node2); - final JavaTestKit mockBroker3 = new JavaTestKit(node3); - - registry3.tell(new SetLocalRouter(mockBroker3.getRef()), mockBroker3.getRef()); + final JavaTestKit testKit = new JavaTestKit(node3); // Add rpc on node 1 - List> addedRouteIds1 = createRouteIds(); - registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef()); - registry1.tell(new AddOrUpdateRoutes(addedRouteIds1), mockBroker1.getRef()); + List addedRouteIds1 = createRouteIds(); + registry1.tell(new AddOrUpdateRoutes(addedRouteIds1), ActorRef.noSender()); + + final UpdateRemoteEndpoints req1 = registrar3.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), + UpdateRemoteEndpoints.class); // Add rpc on node 2 - List> addedRouteIds2 = createRouteIds(); - registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef()); - registry2.tell(new AddOrUpdateRoutes(addedRouteIds2), mockBroker2.getRef()); + List addedRouteIds2 = createRouteIds(); + registry2.tell(new AddOrUpdateRoutes(addedRouteIds2), ActorRef.noSender()); - Address node1Address = node1.provider().getDefaultAddress(); + final UpdateRemoteEndpoints req2 = registrar3.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), + UpdateRemoteEndpoints.class); Address node2Address = node2.provider().getDefaultAddress(); + Address node1Address = node1.provider().getDefaultAddress(); - Map buckets = retrieveBuckets(registry3, mockBroker3, node1Address, + Map> buckets = retrieveBuckets(registry3, testKit, node1Address, node2Address); verifyBucket(buckets.get(node1Address), addedRouteIds1); verifyBucket(buckets.get(node2Address), addedRouteIds2); - Map versions = retrieveVersions(registry3, mockBroker3); - Assert.assertEquals("Version for bucket " + node1Address, buckets.get(node1Address).getVersion(), + Map versions = retrieveVersions(registry3, testKit); + Assert.assertEquals("Version for bucket " + node1Address, (Long) buckets.get(node1Address).getVersion(), versions.get(node1Address)); - Assert.assertEquals("Version for bucket " + node2Address, buckets.get(node2Address).getVersion(), + Assert.assertEquals("Version for bucket " + node2Address, (Long) buckets.get(node2Address).getVersion(), versions.get(node2Address)); - RouteIdentifier routeID = addedRouteIds1.get(0); - registry3.tell(new FindRouters(routeID), mockBroker3.getRef()); + assertEndpoints(req1, node1Address, invoker1); + assertEndpoints(req2, node2Address, invoker2); - FindRoutersReply reply = mockBroker3.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), - FindRoutersReply.class); + } + + private static void assertEndpoints(final UpdateRemoteEndpoints msg, final Address address, + final JavaTestKit invoker) { + final Map> endpoints = msg.getEndpoints(); + Assert.assertEquals(1, endpoints.size()); - List> respList = reply.getRouterWithUpdateTime(); - Assert.assertEquals("getRouterWithUpdateTime size", 1, respList.size()); + final Optional maybeEndpoint = endpoints.get(address); + Assert.assertNotNull(maybeEndpoint); + Assert.assertTrue(maybeEndpoint.isPresent()); - respList.get(0).first().tell("hello", ActorRef.noSender()); - mockBroker1.expectMsgEquals(Duration.create(3, TimeUnit.SECONDS), "hello"); + final RemoteRpcEndpoint endpoint = maybeEndpoint.get(); + final ActorRef router = endpoint.getRouter(); + Assert.assertNotNull(router); + + router.tell("hello", ActorRef.noSender()); + final String s = invoker.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), String.class); + Assert.assertEquals("hello", s); } - private Map retrieveVersions(ActorRef bucketStore, JavaTestKit testKit) { - bucketStore.tell(new GetBucketVersions(), testKit.getRef()); - GetBucketVersionsReply reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), - GetBucketVersionsReply.class); - return reply.getVersions(); + private static Map retrieveVersions(final ActorRef bucketStore, final JavaTestKit testKit) { + bucketStore.tell(GET_BUCKET_VERSIONS, testKit.getRef()); + @SuppressWarnings("unchecked") + final Map reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), Map.class); + return reply; } - private void verifyBucket(Bucket bucket, List> expRouteIds) { + private static void verifyBucket(final Bucket bucket, final List expRouteIds) { RoutingTable table = bucket.getData(); Assert.assertNotNull("Bucket RoutingTable is null", table); - for(RouteIdentifier r: expRouteIds) { - if(!table.contains(r)) { + for (DOMRpcIdentifier r : expRouteIds) { + if (!table.contains(r)) { Assert.fail("RoutingTable does not contain " + r + ". Actual: " + table); } } @@ -261,29 +336,29 @@ public class RpcRegistryTest { Assert.assertEquals("RoutingTable size", expRouteIds.size(), table.size()); } - private Map retrieveBuckets(ActorRef bucketStore, JavaTestKit testKit, - Address... addresses) { - int nTries = 0; - while(true) { - bucketStore.tell(new GetAllBuckets(), testKit.getRef()); - GetAllBucketsReply reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), - GetAllBucketsReply.class); + private static Map> retrieveBuckets(final ActorRef bucketStore, + final JavaTestKit testKit, final Address... addresses) { + int numTries = 0; + while (true) { + bucketStore.tell(GET_ALL_BUCKETS, testKit.getRef()); + @SuppressWarnings("unchecked") + Map> buckets = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), + Map.class); - Map buckets = reply.getBuckets(); boolean foundAll = true; - for(Address addr: addresses) { - Bucket bucket = buckets.get(addr); - if(bucket == null) { + for (Address addr : addresses) { + Bucket bucket = buckets.get(addr); + if (bucket == null) { foundAll = false; break; } } - if(foundAll) { + if (foundAll) { return buckets; } - if(++nTries >= 50) { + if (++numTries >= 50) { Assert.fail("Missing expected buckets for addresses: " + Arrays.toString(addresses) + ", Actual: " + buckets); } @@ -292,43 +367,40 @@ public class RpcRegistryTest { } } - @SuppressWarnings("unchecked") @Test public void testAddRoutesConcurrency() throws Exception { final JavaTestKit testKit = new JavaTestKit(node1); - registry1.tell(new SetLocalRouter(testKit.getRef()), ActorRef.noSender()); - final int nRoutes = 500; - final RouteIdentifier[] added = new RouteIdentifier[nRoutes]; - for(int i = 0; i < nRoutes; i++) { - final RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, - new QName(new URI("/mockrpc"), "type" + i), null); - added[i] = routeId; + final Collection added = new ArrayList<>(nRoutes); + for (int i = 0; i < nRoutes; i++) { + final DOMRpcIdentifier routeId = DOMRpcIdentifier.create(SchemaPath.create(true, + new QName(new URI("/mockrpc"), "type" + i))); + added.add(routeId); //Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); - registry1.tell(new AddOrUpdateRoutes(Arrays.>asList(routeId)), + registry1.tell(new AddOrUpdateRoutes(Arrays.asList(routeId)), ActorRef.noSender()); } - GetAllBuckets getAllBuckets = new GetAllBuckets(); FiniteDuration duration = Duration.create(3, TimeUnit.SECONDS); - int nTries = 0; - while(true) { - registry1.tell(getAllBuckets, testKit.getRef()); - GetAllBucketsReply reply = testKit.expectMsgClass(duration, GetAllBucketsReply.class); + int numTries = 0; + while (true) { + registry1.tell(GET_ALL_BUCKETS, testKit.getRef()); + @SuppressWarnings("unchecked") + Map> buckets = testKit.expectMsgClass(duration, Map.class); - Bucket localBucket = reply.getBuckets().values().iterator().next(); + Bucket localBucket = buckets.values().iterator().next(); RoutingTable table = localBucket.getData(); - if(table != null && table.size() == nRoutes) { - for(RouteIdentifier r: added) { - Assert.assertEquals("RoutingTable contains " + r, true, table.contains(r)); + if (table != null && table.size() == nRoutes) { + for (DOMRpcIdentifier r : added) { + Assert.assertTrue("RoutingTable contains " + r, table.contains(r)); } break; } - if(++nTries >= 50) { + if (++numTries >= 50) { Assert.fail("Expected # routes: " + nRoutes + ", Actual: " + table.size()); } @@ -336,10 +408,10 @@ public class RpcRegistryTest { } } - private List> createRouteIds() throws URISyntaxException { + private List createRouteIds() throws URISyntaxException { QName type = new QName(new URI("/mockrpc"), "mockrpc" + routeIdCounter++); - List> routeIds = new ArrayList<>(); - routeIds.add(new RouteIdentifierImpl(null, type, null)); + List routeIds = new ArrayList<>(1); + routeIds.add(DOMRpcIdentifier.create(SchemaPath.create(true, type))); return routeIds; } }