X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2FRpcRegistryTest.java;h=4ae6e1e57f82c35486a4e58f829cee955d6891be;hp=e6793741a3ec2ef9030a2e469058ab92de93c153;hb=927bce5688e4b9d33d3e5e9b769d8a0dba5ccdd4;hpb=51e91f6bdcc88c5aa96f956e516d31dbb5e5d5e0 diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java index e6793741a3..4ae6e1e57f 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java @@ -1,286 +1,412 @@ +/* + * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ package org.opendaylight.controller.remote.rpc.registry; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_ALL_BUCKETS; +import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_BUCKET_VERSIONS; -import akka.actor.ActorPath; import akka.actor.ActorRef; -import akka.actor.ActorSelection; import akka.actor.ActorSystem; -import akka.actor.ChildActorPath; -import akka.actor.Props; -import akka.testkit.JavaTestKit; -import com.google.common.base.Predicate; +import akka.actor.Address; +import akka.cluster.Cluster; +import akka.cluster.ClusterEvent.CurrentClusterState; +import akka.cluster.Member; +import akka.cluster.MemberStatus; +import akka.cluster.UniqueAddress; +import akka.testkit.javadsl.TestKit; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; - +import java.net.URI; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages; -import org.opendaylight.controller.sal.connector.api.RpcRouter; -import org.opendaylight.controller.utils.ConditionalProbe; +import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader; +import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.RemoteRpcEndpoint; +import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; +import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier; import org.opendaylight.yangtools.yang.common.QName; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; +public class RpcRegistryTest { + private static final Logger LOG = LoggerFactory.getLogger(RpcRegistryTest.class); + + private static ActorSystem node1; + private static ActorSystem node2; + private static ActorSystem node3; + + private TestKit invoker1; + private TestKit invoker2; + private TestKit invoker3; + private TestKit registrar1; + private TestKit registrar2; + private TestKit registrar3; + private ActorRef registry1; + private ActorRef registry2; + private ActorRef registry3; + + private int routeIdCounter = 1; + + @BeforeClass + public static void staticSetup() { + AkkaConfigurationReader reader = ConfigFactory::load; + + RemoteOpsProviderConfig config1 = new RemoteOpsProviderConfig.Builder("memberA").gossipTickInterval("200ms") + .withConfigReader(reader).build(); + RemoteOpsProviderConfig config2 = new RemoteOpsProviderConfig.Builder("memberB").gossipTickInterval("200ms") + .withConfigReader(reader).build(); + RemoteOpsProviderConfig config3 = new RemoteOpsProviderConfig.Builder("memberC").gossipTickInterval("200ms") + .withConfigReader(reader).build(); + node1 = ActorSystem.create("opendaylight-rpc", config1.get()); + node2 = ActorSystem.create("opendaylight-rpc", config2.get()); + node3 = ActorSystem.create("opendaylight-rpc", config3.get()); + + waitForMembersUp(node1, Cluster.get(node2).selfUniqueAddress(), Cluster.get(node3).selfUniqueAddress()); + waitForMembersUp(node2, Cluster.get(node1).selfUniqueAddress(), Cluster.get(node3).selfUniqueAddress()); + } -import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter; -import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; -import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; + static void waitForMembersUp(final ActorSystem node, final UniqueAddress... addresses) { + Set otherMembersSet = Sets.newHashSet(addresses); + Stopwatch sw = Stopwatch.createStarted(); + while (sw.elapsed(TimeUnit.SECONDS) <= 10) { + CurrentClusterState state = Cluster.get(node).state(); + for (Member m : state.getMembers()) { + if (m.status() == MemberStatus.up() && otherMembersSet.remove(m.uniqueAddress()) + && otherMembersSet.isEmpty()) { + return; + } + } + + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + } -public class RpcRegistryTest { + fail("Member(s) " + otherMembersSet + " are not Up"); + } - private static ActorSystem node1; - private static ActorSystem node2; - private static ActorSystem node3; - - private ActorRef registry1; - private ActorRef registry2; - private ActorRef registry3; - - @BeforeClass - public static void setup() throws InterruptedException { - node1 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberA")); - node2 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberB")); - node3 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberC")); - } - - @AfterClass - public static void teardown() { - JavaTestKit.shutdownActorSystem(node1); - JavaTestKit.shutdownActorSystem(node2); - JavaTestKit.shutdownActorSystem(node3); - if (node1 != null) - node1.shutdown(); - if (node2 != null) - node2.shutdown(); - if (node3 != null) - node3.shutdown(); - - } - - @Before - public void createRpcRegistry() throws InterruptedException { - registry1 = node1.actorOf(Props.create(RpcRegistry.class)); - registry2 = node2.actorOf(Props.create(RpcRegistry.class)); - registry3 = node3.actorOf(Props.create(RpcRegistry.class)); - } - - @After - public void stopRpcRegistry() throws InterruptedException { - if (registry1 != null) - node1.stop(registry1); - if (registry2 != null) - node2.stop(registry2); - if (registry3 != null) - node3.stop(registry3); - } - - /** - * One node cluster. - * 1. Register rpc, ensure router can be found - * 2. Then remove rpc, ensure its deleted - * - * @throws URISyntaxException - * @throws InterruptedException - */ - @Test - public void testAddRemoveRpcOnSameNode() throws URISyntaxException, InterruptedException { - validateSystemStartup(); - - final JavaTestKit mockBroker = new JavaTestKit(node1); - - final ActorPath bucketStorePath = new ChildActorPath(registry1.path(), "store"); - - //install probe - final JavaTestKit probe1 = createProbeForMessage( - node1, bucketStorePath, Messages.BucketStoreMessages.UpdateBucket.class); - - //Add rpc on node 1 - registry1.tell(new SetLocalRouter(mockBroker.getRef()), mockBroker.getRef()); - registry1.tell(getAddRouteMessage(), mockBroker.getRef()); - - //Bucket store should get an update bucket message. Updated bucket contains added rpc. - probe1.expectMsgClass( - FiniteDuration.apply(10, TimeUnit.SECONDS), - Messages.BucketStoreMessages.UpdateBucket.class); - - //Now remove rpc - registry1.tell(getRemoveRouteMessage(), mockBroker.getRef()); - - //Bucket store should get an update bucket message. Rpc is removed in the updated bucket - probe1.expectMsgClass( - FiniteDuration.apply(10, TimeUnit.SECONDS), - Messages.BucketStoreMessages.UpdateBucket.class); - - - } - - - /** - * Three node cluster. - * 1. Register rpc on 1 node, ensure 2nd node gets updated - * 2. Remove rpc on 1 node, ensure 2nd node gets updated - * - * @throws URISyntaxException - * @throws InterruptedException - */ - @Test - public void testRpcAddRemoveInCluster() throws URISyntaxException, InterruptedException { + @AfterClass + public static void staticTeardown() { + TestKit.shutdownActorSystem(node1); + TestKit.shutdownActorSystem(node2); + TestKit.shutdownActorSystem(node3); + } - validateSystemStartup(); - - final JavaTestKit mockBroker1 = new JavaTestKit(node1); - - //install probe on node2's bucket store - final ActorPath bucketStorePath = new ChildActorPath(registry2.path(), "store"); - final JavaTestKit probe2 = createProbeForMessage( - node2, bucketStorePath, Messages.BucketStoreMessages.UpdateRemoteBuckets.class); + @Before + public void setup() { + invoker1 = new TestKit(node1); + registrar1 = new TestKit(node1); + registry1 = node1.actorOf(RpcRegistry.props(config(node1), invoker1.getRef(), registrar1.getRef())); + invoker2 = new TestKit(node2); + registrar2 = new TestKit(node2); + registry2 = node2.actorOf(RpcRegistry.props(config(node2), invoker2.getRef(), registrar2.getRef())); + invoker3 = new TestKit(node3); + registrar3 = new TestKit(node3); + registry3 = node3.actorOf(RpcRegistry.props(config(node3), invoker3.getRef(), registrar3.getRef())); + } + private static RemoteOpsProviderConfig config(final ActorSystem node) { + return new RemoteOpsProviderConfig(node.settings().config()); + } - //Add rpc on node 1 - registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef()); - registry1.tell(getAddRouteMessage(), mockBroker1.getRef()); + @After + public void teardown() { + if (registry1 != null) { + node1.stop(registry1); + } + if (registry2 != null) { + node2.stop(registry2); + } + if (registry3 != null) { + node3.stop(registry3); + } - //Bucket store on node2 should get a message to update its local copy of remote buckets - probe2.expectMsgClass( - FiniteDuration.apply(10, TimeUnit.SECONDS), - Messages.BucketStoreMessages.UpdateRemoteBuckets.class); + if (invoker1 != null) { + node1.stop(invoker1.getRef()); + } + if (invoker2 != null) { + node2.stop(invoker2.getRef()); + } + if (invoker3 != null) { + node3.stop(invoker3.getRef()); + } - //Now remove - registry1.tell(getRemoveRouteMessage(), mockBroker1.getRef()); + if (registrar1 != null) { + node1.stop(registrar1.getRef()); + } + if (registrar2 != null) { + node2.stop(registrar2.getRef()); + } + if (registrar3 != null) { + node3.stop(registrar3.getRef()); + } + } - //Bucket store on node2 should get a message to update its local copy of remote buckets - probe2.expectMsgClass( - FiniteDuration.apply(10, TimeUnit.SECONDS), - Messages.BucketStoreMessages.UpdateRemoteBuckets.class); + /** + * One node cluster. 1. Register rpc, ensure router can be found 2. Then remove rpc, ensure its + * deleted + */ + @Test + public void testAddRemoveRpcOnSameNode() { + LOG.info("testAddRemoveRpcOnSameNode starting"); - } + Address nodeAddress = node1.provider().getDefaultAddress(); - /** - * Three node cluster. - * Register rpc on 2 nodes. Ensure 3rd gets updated. - * - * @throws Exception - */ - @Test - public void testRpcAddedOnMultiNodes() throws Exception { + // Add rpc on node 1 - validateSystemStartup(); + List addedRouteIds = createRouteIds(); - final JavaTestKit mockBroker1 = new JavaTestKit(node1); - final JavaTestKit mockBroker2 = new JavaTestKit(node2); - final JavaTestKit mockBroker3 = new JavaTestKit(node3); + registry1.tell(new AddOrUpdateRoutes(addedRouteIds), ActorRef.noSender()); - registry3.tell(new SetLocalRouter(mockBroker3.getRef()), mockBroker3.getRef()); + // Bucket store should get an update bucket message. Updated bucket contains added rpc. + final TestKit testKit = new TestKit(node1); - //install probe on node 3 - final ActorPath bucketStorePath = new ChildActorPath(registry3.path(), "store"); - final JavaTestKit probe3 = createProbeForMessage( - node3, bucketStorePath, Messages.BucketStoreMessages.UpdateRemoteBuckets.class); + Map> buckets = retrieveBuckets(registry1, testKit, nodeAddress); + verifyBucket(buckets.get(nodeAddress), addedRouteIds); + Map versions = retrieveVersions(registry1, testKit); + assertEquals("Version for bucket " + nodeAddress, (Long) buckets.get(nodeAddress).getVersion(), + versions.get(nodeAddress)); - //Add rpc on node 1 - registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef()); - registry1.tell(getAddRouteMessage(), mockBroker1.getRef()); + // Now remove rpc + registry1.tell(new RemoveRoutes(addedRouteIds), ActorRef.noSender()); - probe3.expectMsgClass( - FiniteDuration.apply(10, TimeUnit.SECONDS), - Messages.BucketStoreMessages.UpdateRemoteBuckets.class); + // Bucket store should get an update bucket message. Rpc is removed in the updated bucket + verifyEmptyBucket(testKit, registry1, nodeAddress); - //Add same rpc on node 2 - registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef()); - registry2.tell(getAddRouteMessage(), mockBroker2.getRef()); + LOG.info("testAddRemoveRpcOnSameNode ending"); - probe3.expectMsgClass( - FiniteDuration.apply(10, TimeUnit.SECONDS), - Messages.BucketStoreMessages.UpdateRemoteBuckets.class); - } + } - private JavaTestKit createProbeForMessage(ActorSystem node, ActorPath subjectPath, final Class clazz) { - final JavaTestKit probe = new JavaTestKit(node); + /** + * Three node cluster. 1. Register rpc on 1 node, ensure 2nd node gets updated 2. Remove rpc on + * 1 node, ensure 2nd node gets updated + */ + @Test + public void testRpcAddRemoveInCluster() { - ConditionalProbe conditionalProbe = - new ConditionalProbe(probe.getRef(), new Predicate() { - @Override - public boolean apply(@Nullable Object input) { - return clazz.equals(input.getClass()); - } - }); + LOG.info("testRpcAddRemoveInCluster starting"); - ActorSelection subject = node.actorSelection(subjectPath); - subject.tell(conditionalProbe, ActorRef.noSender()); + List addedRouteIds = createRouteIds(); - return probe; + Address node1Address = node1.provider().getDefaultAddress(); - } + // Add rpc on node 1 + registry1.tell(new AddOrUpdateRoutes(addedRouteIds), ActorRef.noSender()); - private void validateSystemStartup() throws InterruptedException { + // Bucket store on node2 should get a message to update its local copy of remote buckets + final TestKit testKit = new TestKit(node2); - ActorPath gossiper1Path = new ChildActorPath(new ChildActorPath(registry1.path(), "store"), "gossiper"); - ActorPath gossiper2Path = new ChildActorPath(new ChildActorPath(registry2.path(), "store"), "gossiper"); - ActorPath gossiper3Path = new ChildActorPath(new ChildActorPath(registry3.path(), "store"), "gossiper"); + Map> buckets = retrieveBuckets(registry2, testKit, node1Address); + verifyBucket(buckets.get(node1Address), addedRouteIds); - ActorSelection gossiper1 = node1.actorSelection(gossiper1Path); - ActorSelection gossiper2 = node2.actorSelection(gossiper2Path); - ActorSelection gossiper3 = node3.actorSelection(gossiper3Path); + // Now remove + registry1.tell(new RemoveRoutes(addedRouteIds), ActorRef.noSender()); + // Bucket store on node2 should get a message to update its local copy of remote buckets. + // Wait for the bucket for node1 to be empty. - if (!resolveReference(gossiper1, gossiper2, gossiper3)) - Assert.fail("Could not find gossipers"); - } + verifyEmptyBucket(testKit, registry2, node1Address); - private Boolean resolveReference(ActorSelection... gossipers) { + LOG.info("testRpcAddRemoveInCluster ending"); + } - Boolean resolved = true; - for (int i = 0; i < 5; i++) { + private void verifyEmptyBucket(final TestKit testKit, final ActorRef registry, final Address address) + throws AssertionError { + Map> buckets; + int numTries = 0; + while (true) { + buckets = retrieveBuckets(registry1, testKit, address); + + try { + verifyBucket(buckets.get(address), Collections.emptyList()); + break; + } catch (AssertionError e) { + if (++numTries >= 50) { + throw e; + } + } + + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + } + } - resolved = true; - System.out.println(System.currentTimeMillis() + " Resolving gossipers; trial #" + i); + /** + * Three node cluster. Register rpc on 2 nodes. Ensure 3rd gets updated. + */ + @Test + public void testRpcAddedOnMultiNodes() { + final TestKit testKit = new TestKit(node3); - for (ActorSelection gossiper : gossipers) { - ActorRef ref = null; + // Add rpc on node 1 + List addedRouteIds1 = createRouteIds(); + registry1.tell(new AddOrUpdateRoutes(addedRouteIds1), ActorRef.noSender()); - try { - Future future = gossiper.resolveOne(new FiniteDuration(15000, TimeUnit.MILLISECONDS)); - ref = Await.result(future, new FiniteDuration(10000, TimeUnit.MILLISECONDS)); - } catch (Exception e) { - System.out.println("Could not find gossiper in attempt#" + i + ". Got exception " + e.getMessage()); - } + final UpdateRemoteEndpoints req1 = registrar3.expectMsgClass(Duration.ofSeconds(3), + UpdateRemoteEndpoints.class); + + // Add rpc on node 2 + List addedRouteIds2 = createRouteIds(); + registry2.tell(new AddOrUpdateRoutes(addedRouteIds2), ActorRef.noSender()); + + final UpdateRemoteEndpoints req2 = registrar3.expectMsgClass(Duration.ofSeconds(3), + UpdateRemoteEndpoints.class); + Address node2Address = node2.provider().getDefaultAddress(); + Address node1Address = node1.provider().getDefaultAddress(); - if (ref == null) - resolved = false; - } + Map> buckets = retrieveBuckets(registry3, testKit, node1Address, + node2Address); - if (resolved) break; + verifyBucket(buckets.get(node1Address), addedRouteIds1); + verifyBucket(buckets.get(node2Address), addedRouteIds2); + + Map versions = retrieveVersions(registry3, testKit); + assertEquals("Version for bucket " + node1Address, (Long) buckets.get(node1Address).getVersion(), + versions.get(node1Address)); + assertEquals("Version for bucket " + node2Address, (Long) buckets.get(node2Address).getVersion(), + versions.get(node2Address)); + + assertEndpoints(req1, node1Address, invoker1); + assertEndpoints(req2, node2Address, invoker2); } - return resolved; - } - private AddOrUpdateRoutes getAddRouteMessage() throws URISyntaxException { - return new AddOrUpdateRoutes(createRouteIds()); - } + private static void assertEndpoints(final UpdateRemoteEndpoints msg, final Address address, final TestKit invoker) { + final Map> endpoints = msg.getRpcEndpoints(); + assertEquals(1, endpoints.size()); - private RemoveRoutes getRemoveRouteMessage() throws URISyntaxException { - return new RemoveRoutes(createRouteIds()); - } + final Optional maybeEndpoint = endpoints.get(address); + assertNotNull(maybeEndpoint); + assertTrue(maybeEndpoint.isPresent()); - private List> createRouteIds() throws URISyntaxException { - QName type = new QName(new URI("/mockrpc"), "mockrpc"); - List> routeIds = new ArrayList<>(); - routeIds.add(new RouteIdentifierImpl(null, type, null)); - return routeIds; - } + final RemoteRpcEndpoint endpoint = maybeEndpoint.get(); + final ActorRef router = endpoint.getRouter(); + assertNotNull(router); + + router.tell("hello", ActorRef.noSender()); + final String s = invoker.expectMsgClass(Duration.ofSeconds(3), String.class); + assertEquals("hello", s); + } + private static Map retrieveVersions(final ActorRef bucketStore, final TestKit testKit) { + bucketStore.tell(GET_BUCKET_VERSIONS, testKit.getRef()); + @SuppressWarnings("unchecked") + final Map reply = testKit.expectMsgClass(Duration.ofSeconds(3), Map.class); + return reply; + } + + private static void verifyBucket(final Bucket bucket, final List expRouteIds) { + RoutingTable table = bucket.getData(); + assertNotNull("Bucket RoutingTable is null", table); + for (DOMRpcIdentifier r : expRouteIds) { + if (!table.contains(r)) { + fail("RoutingTable does not contain " + r + ". Actual: " + table); + } + } + + assertEquals("RoutingTable size", expRouteIds.size(), table.size()); + } + + private static Map> retrieveBuckets(final ActorRef bucketStore, + final TestKit testKit, final Address... addresses) { + int numTries = 0; + while (true) { + bucketStore.tell(GET_ALL_BUCKETS, testKit.getRef()); + @SuppressWarnings("unchecked") + Map> buckets = testKit.expectMsgClass(Duration.ofSeconds(3), Map.class); + + boolean foundAll = true; + for (Address addr : addresses) { + Bucket bucket = buckets.get(addr); + if (bucket == null) { + foundAll = false; + break; + } + } + + if (foundAll) { + return buckets; + } + + if (++numTries >= 50) { + fail("Missing expected buckets for addresses: " + Arrays.toString(addresses) + ", Actual: " + buckets); + } + + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + } + } + + @Test + public void testAddRoutesConcurrency() { + final TestKit testKit = new TestKit(node1); + + final int nRoutes = 500; + final Collection added = new ArrayList<>(nRoutes); + for (int i = 0; i < nRoutes; i++) { + final DOMRpcIdentifier routeId = DOMRpcIdentifier.create(SchemaPath.create(true, + QName.create(URI.create("/mockrpc"), "type" + i))); + added.add(routeId); + + //Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + registry1.tell(new AddOrUpdateRoutes(Arrays.asList(routeId)), + ActorRef.noSender()); + } + + int numTries = 0; + while (true) { + registry1.tell(GET_ALL_BUCKETS, testKit.getRef()); + @SuppressWarnings("unchecked") + Map> buckets = testKit.expectMsgClass(Duration.ofSeconds(3), Map.class); + + Bucket localBucket = buckets.values().iterator().next(); + RoutingTable table = localBucket.getData(); + if (table != null && table.size() == nRoutes) { + for (DOMRpcIdentifier r : added) { + assertTrue("RoutingTable contains " + r, table.contains(r)); + } + + break; + } + + if (++numTries >= 50) { + fail("Expected # routes: " + nRoutes + ", Actual: " + table.size()); + } + + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + } + } + + private List createRouteIds() { + QName type = QName.create(URI.create("/mockrpc"), "mockrpc" + routeIdCounter++); + List routeIds = new ArrayList<>(1); + routeIds.add(DOMRpcIdentifier.create(SchemaPath.create(true, type))); + return routeIds; + } }