X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2FRpcRegistryTest.java;h=5b7b7e4fdc61dfaaa2377078853b691b48b07394;hp=d011d331a684e4f0bcbbbf395e18145364dfdf4a;hb=c083d5c6ef874e38eebca402105e740f3614d6f7;hpb=244d226cc66672e2e15d0b557bd1af37153d8065 diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java index d011d331a6..5b7b7e4fdc 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java @@ -1,159 +1,345 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - package org.opendaylight.controller.remote.rpc.registry; import akka.actor.ActorRef; import akka.actor.ActorSystem; +import akka.actor.Address; +import akka.actor.Props; +import akka.japi.Pair; import akka.testkit.JavaTestKit; -import junit.framework.Assert; +import com.google.common.util.concurrent.Uninterruptibles; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import org.mockito.Mockito; +import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl; -import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.AddRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply; -import org.opendaylight.controller.remote.rpc.messages.GetRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRpcReply; -import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.RemoveRpc; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRoutersReply; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter; +import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; import org.opendaylight.controller.sal.connector.api.RpcRouter; +import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier; import org.opendaylight.yangtools.yang.common.QName; - -import java.net.URI; -import java.net.URISyntaxException; -import java.util.HashSet; -import java.util.Set; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; public class RpcRegistryTest { - static ActorSystem system; - - - @BeforeClass - public static void setup() { - system = ActorSystem.create(); - } - - @AfterClass - public static void teardown() { - JavaTestKit.shutdownActorSystem(system); - system = null; - } - - /** - This test add, read and remove an entry in global rpc - */ - @Test - public void testGlobalRpc() throws URISyntaxException { - new JavaTestKit(system) {{ - ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(Mockito.mock(ClusterWrapper.class))); - QName type = new QName(new URI("actor1"), "actor1"); - RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null); - final String route = "actor1"; - - AddRpc rpcMsg = new AddRpc(routeId, route); - rpcRegistry.tell(rpcMsg, getRef()); - expectMsgEquals(duration("2 second"), "Success"); - - GetRpc getRpc = new GetRpc(routeId); - rpcRegistry.tell(getRpc, getRef()); - - Boolean getMsg = new ExpectMsg("GetRpcReply") { - protected Boolean match(Object in) { - if (in instanceof GetRpcReply) { - GetRpcReply reply = (GetRpcReply)in; - return route.equals(reply.getRoutePath()); - } else { - throw noMatch(); - } + private static ActorSystem node1; + private static ActorSystem node2; + private static ActorSystem node3; + + private ActorRef registry1; + private ActorRef registry2; + private ActorRef registry3; + + private int routeIdCounter = 1; + + @BeforeClass + public static void staticSetup() throws InterruptedException { + RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build(); + RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").build(); + RemoteRpcProviderConfig config3 = new RemoteRpcProviderConfig.Builder("memberC").build(); + node1 = ActorSystem.create("opendaylight-rpc", config1.get()); + node2 = ActorSystem.create("opendaylight-rpc", config2.get()); + node3 = ActorSystem.create("opendaylight-rpc", config3.get()); + } + + @AfterClass + public static void staticTeardown() { + JavaTestKit.shutdownActorSystem(node1); + JavaTestKit.shutdownActorSystem(node2); + JavaTestKit.shutdownActorSystem(node3); + } + + @Before + public void setup() { + registry1 = node1.actorOf(Props.create(RpcRegistry.class)); + registry2 = node2.actorOf(Props.create(RpcRegistry.class)); + registry3 = node3.actorOf(Props.create(RpcRegistry.class)); + } + + @After + public void teardown() { + if (registry1 != null) { + node1.stop(registry1); + } + if (registry2 != null) { + node2.stop(registry2); } - }.get(); // this extracts the received message + if (registry3 != null) { + node3.stop(registry3); + } + } + + /** + * One node cluster. 1. Register rpc, ensure router can be found 2. Then remove rpc, ensure its + * deleted + * + * @throws URISyntaxException + * @throws InterruptedException + */ + @Test + public void testAddRemoveRpcOnSameNode() throws Exception { + + System.out.println("testAddRemoveRpcOnSameNode starting"); + + final JavaTestKit mockBroker = new JavaTestKit(node1); + + Address nodeAddress = node1.provider().getDefaultAddress(); + + // Add rpc on node 1 + registry1.tell(new SetLocalRouter(mockBroker.getRef()), mockBroker.getRef()); + + List> addedRouteIds = createRouteIds(); + + registry1.tell(new AddOrUpdateRoutes(addedRouteIds), mockBroker.getRef()); + + // Bucket store should get an update bucket message. Updated bucket contains added rpc. + + Map buckets = retrieveBuckets(registry1, mockBroker, nodeAddress); + verifyBucket(buckets.get(nodeAddress), addedRouteIds); + + Map versions = retrieveVersions(registry1, mockBroker); + Assert.assertEquals("Version for bucket " + nodeAddress, buckets.get(nodeAddress).getVersion(), + versions.get(nodeAddress)); + + // Now remove rpc + registry1.tell(new RemoveRoutes(addedRouteIds), mockBroker.getRef()); + + // Bucket store should get an update bucket message. Rpc is removed in the updated bucket + + verifyEmptyBucket(mockBroker, registry1, nodeAddress); + + System.out.println("testAddRemoveRpcOnSameNode ending"); + + } + + /** + * Three node cluster. 1. Register rpc on 1 node, ensure 2nd node gets updated 2. Remove rpc on + * 1 node, ensure 2nd node gets updated + * + * @throws URISyntaxException + * @throws InterruptedException + */ + @Test + public void testRpcAddRemoveInCluster() throws Exception { + + System.out.println("testRpcAddRemoveInCluster starting"); + + final JavaTestKit mockBroker1 = new JavaTestKit(node1); + final JavaTestKit mockBroker2 = new JavaTestKit(node2); + + List> addedRouteIds = createRouteIds(); + + Address node1Address = node1.provider().getDefaultAddress(); - Assert.assertTrue(getMsg); + // Add rpc on node 1 + registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef()); + registry1.tell(new AddOrUpdateRoutes(addedRouteIds), mockBroker1.getRef()); - RemoveRpc removeMsg = new RemoveRpc(routeId); - rpcRegistry.tell(removeMsg, getRef()); - expectMsgEquals(duration("2 second"), "Success"); + // Bucket store on node2 should get a message to update its local copy of remote buckets - rpcRegistry.tell(getRpc, getRef()); + Map buckets = retrieveBuckets(registry2, mockBroker2, node1Address); + verifyBucket(buckets.get(node1Address), addedRouteIds); - Boolean getNullMsg = new ExpectMsg("GetRpcReply") { - protected Boolean match(Object in) { - if (in instanceof GetRpcReply) { - GetRpcReply reply = (GetRpcReply)in; - return reply.getRoutePath() == null; - } else { - throw noMatch(); - } + // Now remove + registry1.tell(new RemoveRoutes(addedRouteIds), mockBroker1.getRef()); + + // Bucket store on node2 should get a message to update its local copy of remote buckets. + // Wait for the bucket for node1 to be empty. + + verifyEmptyBucket(mockBroker2, registry2, node1Address); + + System.out.println("testRpcAddRemoveInCluster ending"); + } + + private void verifyEmptyBucket(JavaTestKit testKit, ActorRef registry, Address address) + throws AssertionError { + Map buckets; + int nTries = 0; + while(true) { + buckets = retrieveBuckets(registry1, testKit, address); + + try { + verifyBucket(buckets.get(address), Collections.>emptyList()); + break; + } catch (AssertionError e) { + if(++nTries >= 50) { + throw e; + } + } + + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); } - }.get(); - Assert.assertTrue(getNullMsg); - }}; - - } - - /** - This test add, read and remove an entry in routed rpc - */ - @Test - public void testRoutedRpc() throws URISyntaxException { - new JavaTestKit(system) {{ - ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(Mockito.mock(ClusterWrapper.class))); - QName type = new QName(new URI("actor1"), "actor1"); - RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null); - final String route = "actor1"; - - Set> routeIds = new HashSet<>(); - routeIds.add(routeId); - - AddRoutedRpc rpcMsg = new AddRoutedRpc(routeIds, route); - rpcRegistry.tell(rpcMsg, getRef()); - expectMsgEquals(duration("2 second"), "Success"); - - GetRoutedRpc getRpc = new GetRoutedRpc(routeId); - rpcRegistry.tell(getRpc, getRef()); - - Boolean getMsg = new ExpectMsg("GetRoutedRpcReply") { - protected Boolean match(Object in) { - if (in instanceof GetRoutedRpcReply) { - GetRoutedRpcReply reply = (GetRoutedRpcReply)in; - return route.equals(reply.getRoutePath()); - } else { - throw noMatch(); - } + } + + /** + * Three node cluster. Register rpc on 2 nodes. Ensure 3rd gets updated. + * + * @throws Exception + */ + @Test + public void testRpcAddedOnMultiNodes() throws Exception { + + final JavaTestKit mockBroker1 = new JavaTestKit(node1); + final JavaTestKit mockBroker2 = new JavaTestKit(node2); + final JavaTestKit mockBroker3 = new JavaTestKit(node3); + + registry3.tell(new SetLocalRouter(mockBroker3.getRef()), mockBroker3.getRef()); + + // Add rpc on node 1 + List> addedRouteIds1 = createRouteIds(); + registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef()); + registry1.tell(new AddOrUpdateRoutes(addedRouteIds1), mockBroker1.getRef()); + + // Add rpc on node 2 + List> addedRouteIds2 = createRouteIds(); + registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef()); + registry2.tell(new AddOrUpdateRoutes(addedRouteIds2), mockBroker2.getRef()); + + Address node1Address = node1.provider().getDefaultAddress(); + Address node2Address = node2.provider().getDefaultAddress(); + + Map buckets = retrieveBuckets(registry3, mockBroker3, node1Address, + node2Address); + + verifyBucket(buckets.get(node1Address), addedRouteIds1); + verifyBucket(buckets.get(node2Address), addedRouteIds2); + + Map versions = retrieveVersions(registry3, mockBroker3); + Assert.assertEquals("Version for bucket " + node1Address, buckets.get(node1Address).getVersion(), + versions.get(node1Address)); + Assert.assertEquals("Version for bucket " + node2Address, buckets.get(node2Address).getVersion(), + versions.get(node2Address)); + + RouteIdentifier routeID = addedRouteIds1.get(0); + registry3.tell(new FindRouters(routeID), mockBroker3.getRef()); + + FindRoutersReply reply = mockBroker3.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), + FindRoutersReply.class); + + List> respList = reply.getRouterWithUpdateTime(); + Assert.assertEquals("getRouterWithUpdateTime size", 1, respList.size()); + + respList.get(0).first().tell("hello", ActorRef.noSender()); + mockBroker1.expectMsgEquals(Duration.create(3, TimeUnit.SECONDS), "hello"); + } + + private Map retrieveVersions(ActorRef bucketStore, JavaTestKit testKit) { + bucketStore.tell(new GetBucketVersions(), testKit.getRef()); + GetBucketVersionsReply reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), + GetBucketVersionsReply.class); + return reply.getVersions(); + } + + private void verifyBucket(Bucket bucket, List> expRouteIds) { + RoutingTable table = bucket.getData(); + Assert.assertNotNull("Bucket RoutingTable is null", table); + for(RouteIdentifier r: expRouteIds) { + if(!table.contains(r)) { + Assert.fail("RoutingTable does not contain " + r + ". Actual: " + table); + } + } + + Assert.assertEquals("RoutingTable size", expRouteIds.size(), table.size()); + } + + private Map retrieveBuckets(ActorRef bucketStore, JavaTestKit testKit, + Address... addresses) { + int nTries = 0; + while(true) { + bucketStore.tell(new GetAllBuckets(), testKit.getRef()); + GetAllBucketsReply reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), + GetAllBucketsReply.class); + + Map buckets = reply.getBuckets(); + boolean foundAll = true; + for(Address addr: addresses) { + Bucket bucket = buckets.get(addr); + if(bucket == null) { + foundAll = false; + break; + } + } + + if(foundAll) { + return buckets; + } + + if(++nTries >= 50) { + Assert.fail("Missing expected buckets for addresses: " + Arrays.toString(addresses) + + ", Actual: " + buckets); + } + + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); } - }.get(); // this extracts the received message + } - Assert.assertTrue(getMsg); + @SuppressWarnings("unchecked") + @Test + public void testAddRoutesConcurrency() throws Exception { + final JavaTestKit testKit = new JavaTestKit(node1); - RemoveRoutedRpc removeMsg = new RemoveRoutedRpc(routeIds, route); - rpcRegistry.tell(removeMsg, getRef()); - expectMsgEquals(duration("2 second"), "Success"); + registry1.tell(new SetLocalRouter(testKit.getRef()), ActorRef.noSender()); - rpcRegistry.tell(getRpc, getRef()); + final int nRoutes = 500; + final RouteIdentifier[] added = new RouteIdentifier[nRoutes]; + for(int i = 0; i < nRoutes; i++) { + final RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, + new QName(new URI("/mockrpc"), "type" + i), null); + added[i] = routeId; - Boolean getNullMsg = new ExpectMsg("GetRoutedRpcReply") { - protected Boolean match(Object in) { - if (in instanceof GetRoutedRpcReply) { - GetRoutedRpcReply reply = (GetRoutedRpcReply)in; - return reply.getRoutePath() == null; - } else { - throw noMatch(); - } + //Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + registry1.tell(new AddOrUpdateRoutes(Arrays.>asList(routeId)), + ActorRef.noSender()); } - }.get(); - Assert.assertTrue(getNullMsg); - }}; - } + GetAllBuckets getAllBuckets = new GetAllBuckets(); + FiniteDuration duration = Duration.create(3, TimeUnit.SECONDS); + int nTries = 0; + while(true) { + registry1.tell(getAllBuckets, testKit.getRef()); + GetAllBucketsReply reply = testKit.expectMsgClass(duration, GetAllBucketsReply.class); + + Bucket localBucket = reply.getBuckets().values().iterator().next(); + RoutingTable table = localBucket.getData(); + if(table != null && table.size() == nRoutes) { + for(RouteIdentifier r: added) { + Assert.assertEquals("RoutingTable contains " + r, true, table.contains(r)); + } + + break; + } + + if(++nTries >= 50) { + Assert.fail("Expected # routes: " + nRoutes + ", Actual: " + table.size()); + } + + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + } + } + private List> createRouteIds() throws URISyntaxException { + QName type = new QName(new URI("/mockrpc"), "mockrpc" + routeIdCounter++); + List> routeIds = new ArrayList<>(); + routeIds.add(new RouteIdentifierImpl(null, type, null)); + return routeIds; + } }