-
- ///
- ///private factories to create Mapper
- ///
-
- /**
- * Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found
- *
- * @param routeId the rpc
- * @param sender client who asked to find the routers.
- * @return
- */
- private Mapper<Object, Void> getMapperToGetRouter(final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
- return new Mapper<Object, Void>() {
- @Override
- public Void apply(Object replyMessage) {
-
- if (replyMessage instanceof GetAllBucketsReply) {
-
- GetAllBucketsReply reply = (GetAllBucketsReply) replyMessage;
- Map<Address, Bucket> buckets = reply.getBuckets();
-
- if (buckets == null || buckets.isEmpty()) {
- sender.tell(createEmptyReply(), getSelf());
- return null;
- }
-
- sender.tell(createReplyWithRouters(buckets, routeId), getSelf());
+ @Override
+ protected void onBucketsUpdated(final Map<Address, Bucket<RoutingTable>> buckets) {
+ final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = new HashMap<>(buckets.size());
+
+ for (Entry<Address, Bucket<RoutingTable>> e : buckets.entrySet()) {
+ final RoutingTable table = e.getValue().getData();
+
+ final List<DOMRpcIdentifier> rpcs = new ArrayList<>(table.getRoutes().size());
+ for (RouteIdentifier<?, ?, ?> ri : table.getRoutes()) {
+ if (ri instanceof RouteIdentifierImpl) {
+ final RouteIdentifierImpl id = (RouteIdentifierImpl) ri;
+ rpcs.add(DOMRpcIdentifier.create(SchemaPath.create(true, id.getType()), id.getRoute()));
+ } else {
+ LOG.warn("Skipping unsupported route {} from {}", ri, e.getKey());