+ ///
+ ///private factories to create Mapper
+ ///
+
+ /**
+ * Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found
+ *
+ * @param routeId the rpc
+ * @param sender client who asked to find the routers.
+ * @return
+ */
+ private Mapper<Object, Void> getMapperToGetRouter(final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
+ return new Mapper<Object, Void>() {
+ @Override
+ public Void apply(Object replyMessage) {
+
+ if (replyMessage instanceof GetAllBucketsReply) {
+
+ GetAllBucketsReply reply = (GetAllBucketsReply) replyMessage;
+ Map<Address, Bucket> buckets = reply.getBuckets();
+
+ if (buckets == null || buckets.isEmpty()) {
+ sender.tell(createEmptyReply(), getSelf());
+ return null;
+ }
+
+ sender.tell(createReplyWithRouters(buckets, routeId), getSelf());
+ }
+ return null;
+ }
+ };
+ }
+
+ /**
+ * Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently,
+ * it updates the local bucket in bucket store.
+ *
+ * @param routeIds rpc to remote
+ * @return
+ */
+ private Mapper<Object, Void> getMapperToRemoveRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
+ return new Mapper<Object, Void>() {
+ @Override
+ public Void apply(Object replyMessage) {
+ if (replyMessage instanceof GetLocalBucketReply) {
+
+ GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
+ Bucket<RoutingTable> bucket = reply.getBucket();
+
+ if (bucket == null) {
+ log.debug("Local bucket is null");
+ return null;
+ }
+
+ RoutingTable table = bucket.getData();
+ if (table == null)
+ table = new RoutingTable();
+
+ table.setRouter(localRouter);
+
+ if (!table.isEmpty()) {
+ for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
+ table.removeRoute(routeId);
+ }
+ }
+ bucket.setData(table);
+
+ UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
+ bucketStore.tell(updateBucketMessage, getSelf());
+ }
+ return null;
+ }
+ };
+ }
+
+ /**
+ * Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently,
+ * it updates the local bucket in bucket store.
+ *
+ * @param routeIds rpc to add
+ * @return
+ */
+ private Mapper<Object, Void> getMapperToAddRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
+
+ return new Mapper<Object, Void>() {
+ @Override
+ public Void apply(Object replyMessage) {
+ if (replyMessage instanceof GetLocalBucketReply) {
+
+ GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
+ Bucket<RoutingTable> bucket = reply.getBucket();
+
+ if (bucket == null) {
+ log.debug("Local bucket is null");
+ return null;
+ }
+
+ RoutingTable table = bucket.getData();
+ if (table == null)
+ table = new RoutingTable();
+
+ table.setRouter(localRouter);
+ for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
+ table.addRoute(routeId);
+ }
+
+ bucket.setData(table);
+
+ UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
+ bucketStore.tell(updateBucketMessage, getSelf());
+ }
+
+ return null;
+ }
+ };
+ }
+
+ /**
+ * All messages used by the RpcRegistry
+ */
+ public static class Messages {
+
+
+ public static class ContainsRoute {
+ final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
+
+ public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+ Preconditions.checkArgument(routeIdentifiers != null &&
+ !routeIdentifiers.isEmpty(),
+ "Route Identifiers must be supplied");
+ this.routeIdentifiers = routeIdentifiers;
+ }
+
+ public List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
+ return this.routeIdentifiers;
+ }
+
+ @Override
+ public String toString() {
+ return "ContainsRoute{" +
+ "routeIdentifiers=" + routeIdentifiers +
+ '}';
+ }
+ }
+
+ public static class AddOrUpdateRoutes extends ContainsRoute {
+
+ public AddOrUpdateRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+ super(routeIdentifiers);
+ }
+ }
+
+ public static class RemoveRoutes extends ContainsRoute {
+
+ public RemoveRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+ super(routeIdentifiers);
+ }
+ }
+
+ public static class SetLocalRouter {
+ private final ActorRef router;
+
+ public SetLocalRouter(ActorRef router) {
+ Preconditions.checkArgument(router != null, "Router must not be null");
+ this.router = router;
+ }
+
+ public ActorRef getRouter() {
+ return this.router;
+ }
+
+ @Override
+ public String toString() {
+ return "SetLocalRouter{" +
+ "router=" + router +
+ '}';
+ }
+ }
+
+ public static class FindRouters {
+ private final RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier;
+
+ public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
+ Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
+ this.routeIdentifier = routeIdentifier;
+ }
+
+ public RpcRouter.RouteIdentifier<?, ?, ?> getRouteIdentifier() {
+ return routeIdentifier;
+ }
+
+ @Override
+ public String toString() {
+ return "FindRouters{" +
+ "routeIdentifier=" + routeIdentifier +
+ '}';
+ }
+ }
+
+ public static class FindRoutersReply {
+ final List<Pair<ActorRef, Long>> routerWithUpdateTime;
+
+ public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
+ Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
+ this.routerWithUpdateTime = routerWithUpdateTime;
+ }
+
+ public List<Pair<ActorRef, Long>> getRouterWithUpdateTime() {
+ return routerWithUpdateTime;
+ }
+
+ @Override
+ public String toString() {
+ return "FindRoutersReply{" +
+ "routerWithUpdateTime=" + routerWithUpdateTime +
+ '}';
+ }
+ }