2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.remote.rpc.registry;
10 import akka.actor.ActorRef;
11 import akka.actor.Address;
12 import akka.actor.Props;
13 import akka.dispatch.Mapper;
14 import akka.event.Logging;
15 import akka.event.LoggingAdapter;
16 import akka.japi.Option;
17 import akka.japi.Pair;
18 import akka.pattern.Patterns;
19 import com.google.common.base.Preconditions;
20 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
21 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
22 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
23 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
24 import org.opendaylight.controller.sal.connector.api.RpcRouter;
25 import scala.concurrent.Future;
27 import java.util.ArrayList;
28 import java.util.Collections;
29 import java.util.List;
32 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
33 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
34 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
35 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
36 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
37 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
38 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
39 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
40 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
43 * Registry to look up cluster nodes that have registered for a given rpc.
45 * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
46 * cluster wide information.
48 public class RpcRegistry extends AbstractUntypedActorWithMetering {
50 final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
53 * Store to keep the registry. Bucket store sync's it across nodes in the cluster
55 private ActorRef bucketStore;
58 * Rpc broker that would use the registry to route requests.
60 private ActorRef localRouter;
62 private RemoteRpcProviderConfig config;
64 public RpcRegistry() {
65 bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store");
66 this.config = new RemoteRpcProviderConfig(getContext().system().settings().config());
67 log.info("Bucket store path = {}", bucketStore.path().toString());
70 public RpcRegistry(ActorRef bucketStore) {
71 this.bucketStore = bucketStore;
76 protected void handleReceive(Object message) throws Exception {
77 //TODO: if sender is remote, reject message
79 if (message instanceof SetLocalRouter)
80 receiveSetLocalRouter((SetLocalRouter) message);
82 if (message instanceof AddOrUpdateRoutes)
83 receiveAddRoutes((AddOrUpdateRoutes) message);
85 else if (message instanceof RemoveRoutes)
86 receiveRemoveRoutes((RemoveRoutes) message);
88 else if (message instanceof Messages.FindRouters)
89 receiveGetRouter((FindRouters) message);
96 * Register's rpc broker
98 * @param message contains {@link akka.actor.ActorRef} for rpc broker
100 private void receiveSetLocalRouter(SetLocalRouter message) {
101 localRouter = message.getRouter();
107 private void receiveAddRoutes(AddOrUpdateRoutes msg) {
109 Preconditions.checkState(localRouter != null, "Router must be set first");
111 Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration());
112 futureReply.map(getMapperToAddRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
116 * @param msg contains list of route ids to remove
118 private void receiveRemoveRoutes(RemoveRoutes msg) {
120 Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration());
121 futureReply.map(getMapperToRemoveRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
126 * Finds routers for the given rpc.
130 private void receiveGetRouter(FindRouters msg) {
131 final ActorRef sender = getSender();
133 Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), config.getAskDuration());
134 futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher());
138 * Helper to create empty reply when no routers are found
142 private Messages.FindRoutersReply createEmptyReply() {
143 List<Pair<ActorRef, Long>> routerWithUpdateTime = Collections.emptyList();
144 return new Messages.FindRoutersReply(routerWithUpdateTime);
148 * Helper to create a reply when routers are found for the given rpc
154 private Messages.FindRoutersReply createReplyWithRouters(
155 Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
157 List<Pair<ActorRef, Long>> routers = new ArrayList<>();
158 Option<Pair<ActorRef, Long>> routerWithUpdateTime = null;
160 for (Bucket bucket : buckets.values()) {
162 RoutingTable table = (RoutingTable) bucket.getData();
166 routerWithUpdateTime = table.getRouterFor(routeId);
167 if (routerWithUpdateTime.isEmpty())
170 routers.add(routerWithUpdateTime.get());
173 return new Messages.FindRoutersReply(routers);
178 ///private factories to create Mapper
182 * Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found
184 * @param routeId the rpc
185 * @param sender client who asked to find the routers.
188 private Mapper<Object, Void> getMapperToGetRouter(
189 final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
190 return new Mapper<Object, Void>() {
192 public Void apply(Object replyMessage) {
194 if (replyMessage instanceof GetAllBucketsReply) {
196 GetAllBucketsReply reply = (GetAllBucketsReply) replyMessage;
197 Map<Address, Bucket> buckets = reply.getBuckets();
199 if (buckets == null || buckets.isEmpty()) {
200 sender.tell(createEmptyReply(), getSelf());
204 sender.tell(createReplyWithRouters(buckets, routeId), getSelf());
212 * Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently,
213 * it updates the local bucket in bucket store.
215 * @param routeIds rpc to remote
218 private Mapper<Object, Void> getMapperToRemoveRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
219 return new Mapper<Object, Void>() {
221 public Void apply(Object replyMessage) {
222 if (replyMessage instanceof GetLocalBucketReply) {
224 GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
225 Bucket<RoutingTable> bucket = reply.getBucket();
227 if (bucket == null) {
228 log.debug("Local bucket is null");
232 RoutingTable table = bucket.getData();
234 table = new RoutingTable();
236 table.setRouter(localRouter);
238 if (!table.isEmpty()) {
239 for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
240 table.removeRoute(routeId);
243 bucket.setData(table);
245 UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
246 bucketStore.tell(updateBucketMessage, getSelf());
254 * Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently,
255 * it updates the local bucket in bucket store.
257 * @param routeIds rpc to add
260 private Mapper<Object, Void> getMapperToAddRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
262 return new Mapper<Object, Void>() {
264 public Void apply(Object replyMessage) {
265 if (replyMessage instanceof GetLocalBucketReply) {
267 GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
268 Bucket<RoutingTable> bucket = reply.getBucket();
270 if (bucket == null) {
271 log.debug("Local bucket is null");
275 RoutingTable table = bucket.getData();
277 table = new RoutingTable();
279 table.setRouter(localRouter);
280 for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
281 table.addRoute(routeId);
284 bucket.setData(table);
286 UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
287 bucketStore.tell(updateBucketMessage, getSelf());
296 * All messages used by the RpcRegistry
298 public static class Messages {
301 public static class ContainsRoute {
302 final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
304 public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
305 Preconditions.checkArgument(routeIdentifiers != null &&
306 !routeIdentifiers.isEmpty(),
307 "Route Identifiers must be supplied");
308 this.routeIdentifiers = routeIdentifiers;
311 public List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
312 return this.routeIdentifiers;
316 public String toString() {
317 return "ContainsRoute{" +
318 "routeIdentifiers=" + routeIdentifiers +
323 public static class AddOrUpdateRoutes extends ContainsRoute {
325 public AddOrUpdateRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
326 super(routeIdentifiers);
330 public static class RemoveRoutes extends ContainsRoute {
332 public RemoveRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
333 super(routeIdentifiers);
337 public static class SetLocalRouter {
338 private final ActorRef router;
340 public SetLocalRouter(ActorRef router) {
341 Preconditions.checkArgument(router != null, "Router must not be null");
342 this.router = router;
345 public ActorRef getRouter() {
350 public String toString() {
351 return "SetLocalRouter{" +
357 public static class FindRouters {
358 private final RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier;
360 public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
361 Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
362 this.routeIdentifier = routeIdentifier;
365 public RpcRouter.RouteIdentifier<?, ?, ?> getRouteIdentifier() {
366 return routeIdentifier;
370 public String toString() {
371 return "FindRouters{" +
372 "routeIdentifier=" + routeIdentifier +
377 public static class FindRoutersReply {
378 final List<Pair<ActorRef, Long>> routerWithUpdateTime;
380 public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
381 Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
382 this.routerWithUpdateTime = routerWithUpdateTime;
385 public List<Pair<ActorRef, Long>> getRouterWithUpdateTime() {
386 return routerWithUpdateTime;
390 public String toString() {
391 return "FindRoutersReply{" +
392 "routerWithUpdateTime=" + routerWithUpdateTime +