+ private static Map<Address, Long> retrieveVersions(final ActorRef bucketStore, final TestKit testKit) {
+ bucketStore.tell(GET_BUCKET_VERSIONS, testKit.getRef());
+ @SuppressWarnings("unchecked")
+ final Map<Address, Long> reply = testKit.expectMsgClass(Duration.ofSeconds(3), Map.class);
+ return reply;
+ }
+
+ private static void verifyBucket(final Bucket<RoutingTable> bucket, final List<DOMRpcIdentifier> expRouteIds) {
+ RoutingTable table = bucket.getData();
+ assertNotNull("Bucket RoutingTable is null", table);
+ for (DOMRpcIdentifier r : expRouteIds) {
+ if (!table.contains(r)) {
+ fail("RoutingTable does not contain " + r + ". Actual: " + table);
+ }
+ }
+
+ assertEquals("RoutingTable size", expRouteIds.size(), table.size());
+ }
+
+ private static Map<Address, Bucket<RoutingTable>> retrieveBuckets(final ActorRef bucketStore,
+ final TestKit testKit, final Address... addresses) {
+ int numTries = 0;
+ while (true) {
+ bucketStore.tell(GET_ALL_BUCKETS, testKit.getRef());
+ @SuppressWarnings("unchecked")
+ Map<Address, Bucket<RoutingTable>> buckets = testKit.expectMsgClass(Duration.ofSeconds(3), Map.class);
+
+ boolean foundAll = true;
+ for (Address addr : addresses) {
+ Bucket<RoutingTable> bucket = buckets.get(addr);
+ if (bucket == null) {
+ foundAll = false;
+ break;
+ }
+ }
+
+ if (foundAll) {
+ return buckets;
+ }
+
+ if (++numTries >= 50) {
+ fail("Missing expected buckets for addresses: " + Arrays.toString(addresses) + ", Actual: " + buckets);
+ }
+
+ Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @Test
+ public void testAddRoutesConcurrency() {
+ final TestKit testKit = new TestKit(node1);
+
+ final int nRoutes = 500;
+ final Collection<DOMRpcIdentifier> added = new ArrayList<>(nRoutes);
+ for (int i = 0; i < nRoutes; i++) {
+ final DOMRpcIdentifier routeId = DOMRpcIdentifier.create(SchemaPath.create(true,
+ QName.create(URI.create("/mockrpc"), "type" + i)));
+ added.add(routeId);
+
+ //Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ registry1.tell(new AddOrUpdateRoutes(Arrays.asList(routeId)),
+ ActorRef.noSender());
+ }
+
+ int numTries = 0;
+ while (true) {
+ registry1.tell(GET_ALL_BUCKETS, testKit.getRef());
+ @SuppressWarnings("unchecked")
+ Map<Address, Bucket<RoutingTable>> buckets = testKit.expectMsgClass(Duration.ofSeconds(3), Map.class);
+
+ Bucket<RoutingTable> localBucket = buckets.values().iterator().next();
+ RoutingTable table = localBucket.getData();
+ if (table != null && table.size() == nRoutes) {
+ for (DOMRpcIdentifier r : added) {
+ assertTrue("RoutingTable contains " + r, table.contains(r));
+ }
+
+ break;
+ }
+
+ if (++numTries >= 50) {
+ fail("Expected # routes: " + nRoutes + ", Actual: " + table.size());
+ }
+
+ Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ private List<DOMRpcIdentifier> createRouteIds() {
+ QName type = QName.create(URI.create("/mockrpc"), "mockrpc" + routeIdCounter++);
+ List<DOMRpcIdentifier> routeIds = new ArrayList<>(1);
+ routeIds.add(DOMRpcIdentifier.create(SchemaPath.create(true, type)));
+ return routeIds;
+ }