- }.get();
- Assert.assertTrue(getNullMsg);
- }};
-
- }
-
- /**
- This test add, read and remove an entry in routed rpc
- */
- @Test
- public void testRoutedRpc() throws URISyntaxException {
- new JavaTestKit(system) {{
- ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(Mockito.mock(ClusterWrapper.class)));
- QName type = new QName(new URI("actor1"), "actor1");
- RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null);
- final String route = "actor1";
-
- Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new HashSet<>();
- routeIds.add(routeId);
-
- AddRoutedRpc rpcMsg = new AddRoutedRpc(routeIds, route);
- rpcRegistry.tell(rpcMsg, getRef());
- expectMsgEquals(duration("2 second"), "Success");
-
- GetRoutedRpc getRpc = new GetRoutedRpc(routeId);
- rpcRegistry.tell(getRpc, getRef());
-
- Boolean getMsg = new ExpectMsg<Boolean>("GetRoutedRpcReply") {
- protected Boolean match(Object in) {
- if (in instanceof GetRoutedRpcReply) {
- GetRoutedRpcReply reply = (GetRoutedRpcReply)in;
- return route.equals(reply.getRoutePath());
- } else {
- throw noMatch();
- }
+ }
+
+ /**
+ * Three node cluster. Register rpc on 2 nodes. Ensure 3rd gets updated.
+ */
+ @Test
+ public void testRpcAddedOnMultiNodes() {
+ final TestKit testKit = new TestKit(node3);
+
+ // Add rpc on node 1
+ List<DOMRpcIdentifier> addedRouteIds1 = createRouteIds();
+ registry1.tell(new AddOrUpdateRoutes(addedRouteIds1), ActorRef.noSender());
+
+ final UpdateRemoteEndpoints req1 = registrar3.expectMsgClass(Duration.ofSeconds(3),
+ UpdateRemoteEndpoints.class);
+
+ // Add rpc on node 2
+ List<DOMRpcIdentifier> addedRouteIds2 = createRouteIds();
+ registry2.tell(new AddOrUpdateRoutes(addedRouteIds2), ActorRef.noSender());
+
+ final UpdateRemoteEndpoints req2 = registrar3.expectMsgClass(Duration.ofSeconds(3),
+ UpdateRemoteEndpoints.class);
+ Address node2Address = node2.provider().getDefaultAddress();
+ Address node1Address = node1.provider().getDefaultAddress();
+
+ Map<Address, Bucket<RoutingTable>> buckets = retrieveBuckets(registry3, testKit, node1Address,
+ node2Address);
+
+ verifyBucket(buckets.get(node1Address), addedRouteIds1);
+ verifyBucket(buckets.get(node2Address), addedRouteIds2);
+
+ Map<Address, Long> versions = retrieveVersions(registry3, testKit);
+ assertEquals("Version for bucket " + node1Address, (Long) buckets.get(node1Address).getVersion(),
+ versions.get(node1Address));
+ assertEquals("Version for bucket " + node2Address, (Long) buckets.get(node2Address).getVersion(),
+ versions.get(node2Address));
+
+ assertEndpoints(req1, node1Address, invoker1);
+ assertEndpoints(req2, node2Address, invoker2);
+
+ }
+
+ private static void assertEndpoints(final UpdateRemoteEndpoints msg, final Address address, final TestKit invoker) {
+ final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = msg.getRpcEndpoints();
+ assertEquals(1, endpoints.size());
+
+ final Optional<RemoteRpcEndpoint> maybeEndpoint = endpoints.get(address);
+ assertNotNull(maybeEndpoint);
+ assertTrue(maybeEndpoint.isPresent());
+
+ final RemoteRpcEndpoint endpoint = maybeEndpoint.get();
+ final ActorRef router = endpoint.getRouter();
+ assertNotNull(router);
+
+ router.tell("hello", ActorRef.noSender());
+ final String s = invoker.expectMsgClass(Duration.ofSeconds(3), String.class);
+ assertEquals("hello", s);
+ }
+
+ private static Map<Address, Long> retrieveVersions(final ActorRef bucketStore, final TestKit testKit) {
+ bucketStore.tell(GET_BUCKET_VERSIONS, testKit.getRef());
+ @SuppressWarnings("unchecked")
+ final Map<Address, Long> reply = testKit.expectMsgClass(Duration.ofSeconds(3), Map.class);
+ return reply;
+ }
+
+ private static void verifyBucket(final Bucket<RoutingTable> bucket, final List<DOMRpcIdentifier> expRouteIds) {
+ RoutingTable table = bucket.getData();
+ assertNotNull("Bucket RoutingTable is null", table);
+ for (DOMRpcIdentifier r : expRouteIds) {
+ if (!table.contains(r)) {
+ fail("RoutingTable does not contain " + r + ". Actual: " + table);
+ }