2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.remote.rpc.registry;
10 import akka.actor.ActorRef;
11 import akka.actor.Address;
12 import akka.actor.Props;
13 import akka.actor.UntypedActor;
14 import akka.dispatch.Mapper;
15 import akka.event.Logging;
16 import akka.event.LoggingAdapter;
17 import akka.japi.Option;
18 import akka.japi.Pair;
19 import akka.pattern.Patterns;
20 import com.google.common.base.Preconditions;
21 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
22 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
23 import org.opendaylight.controller.sal.connector.api.RpcRouter;
24 import scala.concurrent.Future;
26 import java.util.ArrayList;
27 import java.util.Collections;
28 import java.util.List;
31 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
32 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
33 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
34 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
35 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
36 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
37 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
38 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
39 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
42 * Registry to look up cluster nodes that have registered for a given rpc.
44 * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
45 * cluster wide information.
47 public class RpcRegistry extends UntypedActor {
49 final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
52 * Store to keep the registry. Bucket store sync's it across nodes in the cluster
54 private ActorRef bucketStore;
57 * Rpc broker that would use the registry to route requests.
59 private ActorRef localRouter;
61 public RpcRegistry() {
62 bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store");
64 log.info("Bucket store path = {}", bucketStore.path().toString());
67 public RpcRegistry(ActorRef bucketStore) {
68 this.bucketStore = bucketStore;
72 public void onReceive(Object message) throws Exception {
74 log.debug("Received message: message [{}]", message);
76 //TODO: if sender is remote, reject message
78 if (message instanceof SetLocalRouter)
79 receiveSetLocalRouter((SetLocalRouter) message);
81 if (message instanceof AddOrUpdateRoutes)
82 receiveAddRoutes((AddOrUpdateRoutes) message);
84 else if (message instanceof RemoveRoutes)
85 receiveRemoveRoutes((RemoveRoutes) message);
87 else if (message instanceof Messages.FindRouters)
88 receiveGetRouter((FindRouters) message);
95 * Register's rpc broker
97 * @param message contains {@link akka.actor.ActorRef} for rpc broker
99 private void receiveSetLocalRouter(SetLocalRouter message) {
100 localRouter = message.getRouter();
106 private void receiveAddRoutes(AddOrUpdateRoutes msg) {
108 Preconditions.checkState(localRouter != null, "Router must be set first");
110 Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000);
111 futureReply.map(getMapperToAddRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
115 * @param msg contains list of route ids to remove
117 private void receiveRemoveRoutes(RemoveRoutes msg) {
119 Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000);
120 futureReply.map(getMapperToRemoveRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
125 * Finds routers for the given rpc.
129 private void receiveGetRouter(FindRouters msg) {
130 final ActorRef sender = getSender();
132 Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), 1000);
133 futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher());
137 * Helper to create empty reply when no routers are found
141 private Messages.FindRoutersReply createEmptyReply() {
142 List<Pair<ActorRef, Long>> routerWithUpdateTime = Collections.emptyList();
143 return new Messages.FindRoutersReply(routerWithUpdateTime);
147 * Helper to create a reply when routers are found for the given rpc
153 private Messages.FindRoutersReply createReplyWithRouters(Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
155 List<Pair<ActorRef, Long>> routers = new ArrayList<>();
156 Option<Pair<ActorRef, Long>> routerWithUpdateTime = null;
158 for (Bucket bucket : buckets.values()) {
160 RoutingTable table = (RoutingTable) bucket.getData();
164 routerWithUpdateTime = table.getRouterFor(routeId);
165 if (routerWithUpdateTime.isEmpty())
168 routers.add(routerWithUpdateTime.get());
171 return new Messages.FindRoutersReply(routers);
176 ///private factories to create Mapper
180 * Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found
182 * @param routeId the rpc
183 * @param sender client who asked to find the routers.
186 private Mapper<Object, Void> getMapperToGetRouter(final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
187 return new Mapper<Object, Void>() {
189 public Void apply(Object replyMessage) {
191 if (replyMessage instanceof GetAllBucketsReply) {
193 GetAllBucketsReply reply = (GetAllBucketsReply) replyMessage;
194 Map<Address, Bucket> buckets = reply.getBuckets();
196 if (buckets == null || buckets.isEmpty()) {
197 sender.tell(createEmptyReply(), getSelf());
201 sender.tell(createReplyWithRouters(buckets, routeId), getSelf());
209 * Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently,
210 * it updates the local bucket in bucket store.
212 * @param routeIds rpc to remote
215 private Mapper<Object, Void> getMapperToRemoveRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
216 return new Mapper<Object, Void>() {
218 public Void apply(Object replyMessage) {
219 if (replyMessage instanceof GetLocalBucketReply) {
221 GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
222 Bucket<RoutingTable> bucket = reply.getBucket();
224 if (bucket == null) {
225 log.debug("Local bucket is null");
229 RoutingTable table = bucket.getData();
231 table = new RoutingTable();
233 table.setRouter(localRouter);
235 if (!table.isEmpty()) {
236 for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
237 table.removeRoute(routeId);
240 bucket.setData(table);
242 UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
243 bucketStore.tell(updateBucketMessage, getSelf());
251 * Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently,
252 * it updates the local bucket in bucket store.
254 * @param routeIds rpc to add
257 private Mapper<Object, Void> getMapperToAddRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
259 return new Mapper<Object, Void>() {
261 public Void apply(Object replyMessage) {
262 if (replyMessage instanceof GetLocalBucketReply) {
264 GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
265 Bucket<RoutingTable> bucket = reply.getBucket();
267 if (bucket == null) {
268 log.debug("Local bucket is null");
272 RoutingTable table = bucket.getData();
274 table = new RoutingTable();
276 table.setRouter(localRouter);
277 for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
278 table.addRoute(routeId);
281 bucket.setData(table);
283 UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
284 bucketStore.tell(updateBucketMessage, getSelf());
293 * All messages used by the RpcRegistry
295 public static class Messages {
298 public static class ContainsRoute {
299 final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
301 public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
302 Preconditions.checkArgument(routeIdentifiers != null &&
303 !routeIdentifiers.isEmpty(),
304 "Route Identifiers must be supplied");
305 this.routeIdentifiers = routeIdentifiers;
308 public List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
309 return this.routeIdentifiers;
313 public String toString() {
314 return "ContainsRoute{" +
315 "routeIdentifiers=" + routeIdentifiers +
320 public static class AddOrUpdateRoutes extends ContainsRoute {
322 public AddOrUpdateRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
323 super(routeIdentifiers);
327 public static class RemoveRoutes extends ContainsRoute {
329 public RemoveRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
330 super(routeIdentifiers);
334 public static class SetLocalRouter {
335 private final ActorRef router;
337 public SetLocalRouter(ActorRef router) {
338 Preconditions.checkArgument(router != null, "Router must not be null");
339 this.router = router;
342 public ActorRef getRouter() {
347 public String toString() {
348 return "SetLocalRouter{" +
354 public static class FindRouters {
355 private final RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier;
357 public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
358 Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
359 this.routeIdentifier = routeIdentifier;
362 public RpcRouter.RouteIdentifier<?, ?, ?> getRouteIdentifier() {
363 return routeIdentifier;
367 public String toString() {
368 return "FindRouters{" +
369 "routeIdentifier=" + routeIdentifier +
374 public static class FindRoutersReply {
375 final List<Pair<ActorRef, Long>> routerWithUpdateTime;
377 public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
378 Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
379 this.routerWithUpdateTime = routerWithUpdateTime;
382 public List<Pair<ActorRef, Long>> getRouterWithUpdateTime() {
383 return routerWithUpdateTime;
387 public String toString() {
388 return "FindRoutersReply{" +
389 "routerWithUpdateTime=" + routerWithUpdateTime +