- ///
- ///private factories to create Mapper
- ///
-
- /**
- * Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found
- *
- * @param routeId the rpc
- * @param sender client who asked to find the routers.
- * @return
- */
- private Mapper<Object, Void> getMapperToGetRouter(final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
- return new Mapper<Object, Void>() {
- @Override
- public Void apply(Object replyMessage) {
-
- if (replyMessage instanceof GetAllBucketsReply) {
-
- GetAllBucketsReply reply = (GetAllBucketsReply) replyMessage;
- Map<Address, Bucket> buckets = reply.getBuckets();
-
- if (buckets == null || buckets.isEmpty()) {
- sender.tell(createEmptyReply(), getSelf());
- return null;
- }
-
- sender.tell(createReplyWithRouters(buckets, routeId), getSelf());
- }
- return null;
- }
- };
- }
-
- /**
- * Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently,
- * it updates the local bucket in bucket store.
- *
- * @param routeIds rpc to remote
- * @return
- */
- private Mapper<Object, Void> getMapperToRemoveRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
- return new Mapper<Object, Void>() {
- @Override
- public Void apply(Object replyMessage) {
- if (replyMessage instanceof GetLocalBucketReply) {
-
- GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
- Bucket<RoutingTable> bucket = reply.getBucket();
-
- if (bucket == null) {
- log.debug("Local bucket is null");
- return null;
- }
-
- RoutingTable table = bucket.getData();
- if (table == null)
- table = new RoutingTable();
-
- table.setRouter(localRouter);
-
- if (!table.isEmpty()) {
- for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
- table.removeRoute(routeId);
- }
- }
- bucket.setData(table);
-
- UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
- bucketStore.tell(updateBucketMessage, getSelf());
- }
- return null;
- }
- };
- }
-
- /**
- * Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently,
- * it updates the local bucket in bucket store.
- *
- * @param routeIds rpc to add
- * @return
- */
- private Mapper<Object, Void> getMapperToAddRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
-
- return new Mapper<Object, Void>() {
- @Override
- public Void apply(Object replyMessage) {
- if (replyMessage instanceof GetLocalBucketReply) {
-
- GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
- Bucket<RoutingTable> bucket = reply.getBucket();
-
- if (bucket == null) {
- log.debug("Local bucket is null");
- return null;
- }
-
- RoutingTable table = bucket.getData();
- if (table == null)
- table = new RoutingTable();
-
- table.setRouter(localRouter);
- for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
- table.addRoute(routeId);
- }
-
- bucket.setData(table);
-
- UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
- bucketStore.tell(updateBucketMessage, getSelf());
- }
-
- return null;
- }
- };