+ @SuppressWarnings("unchecked")
+ @Test
+ public void testAddRoutesConcurrency() throws Exception {
+ final JavaTestKit testKit = new JavaTestKit(node1);
+
+ registry1.tell(new SetLocalRouter(testKit.getRef()), ActorRef.noSender());
+
+ final int nRoutes = 500;
+ final RouteIdentifier<?, ?, ?>[] added = new RouteIdentifier<?, ?, ?>[nRoutes];
+ for(int i = 0; i < nRoutes; i++) {
+ final RouteIdentifierImpl routeId = new RouteIdentifierImpl(null,
+ new QName(new URI("/mockrpc"), "type" + i), null);
+ added[i] = routeId;
+
+ //Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ registry1.tell(new AddOrUpdateRoutes(Arrays.<RouteIdentifier<?, ?, ?>>asList(routeId)),
+ ActorRef.noSender());
+ }
+
+ GetAllBuckets getAllBuckets = new GetAllBuckets();
+ FiniteDuration duration = Duration.create(3, TimeUnit.SECONDS);
+ int nTries = 0;
+ while(true) {
+ registry1.tell(getAllBuckets, testKit.getRef());
+ GetAllBucketsReply reply = testKit.expectMsgClass(duration, GetAllBucketsReply.class);
+
+ Bucket<RoutingTable> localBucket = reply.getBuckets().values().iterator().next();
+ RoutingTable table = localBucket.getData();
+ if(table != null && table.size() == nRoutes) {
+ for(RouteIdentifier<?, ?, ?> r: added) {
+ Assert.assertEquals("RoutingTable contains " + r, true, table.contains(r));
+ }
+
+ break;
+ }
+
+ if(++nTries >= 50) {
+ Assert.fail("Expected # routes: " + nRoutes + ", Actual: " + table.size());
+ }
+
+ Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+ }