2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.remote.rpc.registry;
10 import akka.actor.ActorRef;
11 import akka.actor.Address;
12 import akka.actor.Props;
13 import akka.actor.UntypedActor;
14 import akka.dispatch.Mapper;
15 import akka.event.Logging;
16 import akka.event.LoggingAdapter;
17 import akka.japi.Option;
18 import akka.japi.Pair;
19 import akka.pattern.Patterns;
20 import com.google.common.base.Preconditions;
21 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
22 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
23 import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
24 import org.opendaylight.controller.sal.connector.api.RpcRouter;
25 import scala.concurrent.Future;
27 import java.util.ArrayList;
28 import java.util.Collections;
29 import java.util.List;
32 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
33 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
34 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
35 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
36 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
37 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
38 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
39 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
40 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
43 * Registry to look up cluster nodes that have registered for a given rpc.
45 * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
46 * cluster wide information.
48 public class RpcRegistry extends UntypedActor {
50 final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
53 * Store to keep the registry. Bucket store sync's it across nodes in the cluster
55 private ActorRef bucketStore;
58 * Rpc broker that would use the registry to route requests.
60 private ActorRef localRouter;
62 public RpcRegistry() {
63 bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store");
65 log.info("Bucket store path = {}", bucketStore.path().toString());
68 public RpcRegistry(ActorRef bucketStore) {
69 this.bucketStore = bucketStore;
73 public void onReceive(Object message) throws Exception {
75 log.debug("Received message: message [{}]", message);
77 //TODO: if sender is remote, reject message
79 if (message instanceof SetLocalRouter)
80 receiveSetLocalRouter((SetLocalRouter) message);
82 if (message instanceof AddOrUpdateRoutes)
83 receiveAddRoutes((AddOrUpdateRoutes) message);
85 else if (message instanceof RemoveRoutes)
86 receiveRemoveRoutes((RemoveRoutes) message);
88 else if (message instanceof Messages.FindRouters)
89 receiveGetRouter((FindRouters) message);
96 * Register's rpc broker
98 * @param message contains {@link akka.actor.ActorRef} for rpc broker
100 private void receiveSetLocalRouter(SetLocalRouter message) {
101 localRouter = message.getRouter();
107 private void receiveAddRoutes(AddOrUpdateRoutes msg) {
109 Preconditions.checkState(localRouter != null, "Router must be set first");
111 Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), ActorUtil.ASK_DURATION.toMillis());
112 futureReply.map(getMapperToAddRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
116 * @param msg contains list of route ids to remove
118 private void receiveRemoveRoutes(RemoveRoutes msg) {
120 Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), ActorUtil.ASK_DURATION.toMillis());
121 futureReply.map(getMapperToRemoveRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
126 * Finds routers for the given rpc.
130 private void receiveGetRouter(FindRouters msg) {
131 final ActorRef sender = getSender();
133 Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), ActorUtil.ASK_DURATION.toMillis());
134 futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher());
138 * Helper to create empty reply when no routers are found
142 private Messages.FindRoutersReply createEmptyReply() {
143 List<Pair<ActorRef, Long>> routerWithUpdateTime = Collections.emptyList();
144 return new Messages.FindRoutersReply(routerWithUpdateTime);
148 * Helper to create a reply when routers are found for the given rpc
154 private Messages.FindRoutersReply createReplyWithRouters(Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
156 List<Pair<ActorRef, Long>> routers = new ArrayList<>();
157 Option<Pair<ActorRef, Long>> routerWithUpdateTime = null;
159 for (Bucket bucket : buckets.values()) {
161 RoutingTable table = (RoutingTable) bucket.getData();
165 routerWithUpdateTime = table.getRouterFor(routeId);
166 if (routerWithUpdateTime.isEmpty())
169 routers.add(routerWithUpdateTime.get());
172 return new Messages.FindRoutersReply(routers);
177 ///private factories to create Mapper
181 * Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found
183 * @param routeId the rpc
184 * @param sender client who asked to find the routers.
187 private Mapper<Object, Void> getMapperToGetRouter(final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
188 return new Mapper<Object, Void>() {
190 public Void apply(Object replyMessage) {
192 if (replyMessage instanceof GetAllBucketsReply) {
194 GetAllBucketsReply reply = (GetAllBucketsReply) replyMessage;
195 Map<Address, Bucket> buckets = reply.getBuckets();
197 if (buckets == null || buckets.isEmpty()) {
198 sender.tell(createEmptyReply(), getSelf());
202 sender.tell(createReplyWithRouters(buckets, routeId), getSelf());
210 * Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently,
211 * it updates the local bucket in bucket store.
213 * @param routeIds rpc to remote
216 private Mapper<Object, Void> getMapperToRemoveRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
217 return new Mapper<Object, Void>() {
219 public Void apply(Object replyMessage) {
220 if (replyMessage instanceof GetLocalBucketReply) {
222 GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
223 Bucket<RoutingTable> bucket = reply.getBucket();
225 if (bucket == null) {
226 log.debug("Local bucket is null");
230 RoutingTable table = bucket.getData();
232 table = new RoutingTable();
234 table.setRouter(localRouter);
236 if (!table.isEmpty()) {
237 for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
238 table.removeRoute(routeId);
241 bucket.setData(table);
243 UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
244 bucketStore.tell(updateBucketMessage, getSelf());
252 * Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently,
253 * it updates the local bucket in bucket store.
255 * @param routeIds rpc to add
258 private Mapper<Object, Void> getMapperToAddRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
260 return new Mapper<Object, Void>() {
262 public Void apply(Object replyMessage) {
263 if (replyMessage instanceof GetLocalBucketReply) {
265 GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
266 Bucket<RoutingTable> bucket = reply.getBucket();
268 if (bucket == null) {
269 log.debug("Local bucket is null");
273 RoutingTable table = bucket.getData();
275 table = new RoutingTable();
277 table.setRouter(localRouter);
278 for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
279 table.addRoute(routeId);
282 bucket.setData(table);
284 UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
285 bucketStore.tell(updateBucketMessage, getSelf());
294 * All messages used by the RpcRegistry
296 public static class Messages {
299 public static class ContainsRoute {
300 final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
302 public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
303 Preconditions.checkArgument(routeIdentifiers != null &&
304 !routeIdentifiers.isEmpty(),
305 "Route Identifiers must be supplied");
306 this.routeIdentifiers = routeIdentifiers;
309 public List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
310 return this.routeIdentifiers;
314 public String toString() {
315 return "ContainsRoute{" +
316 "routeIdentifiers=" + routeIdentifiers +
321 public static class AddOrUpdateRoutes extends ContainsRoute {
323 public AddOrUpdateRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
324 super(routeIdentifiers);
328 public static class RemoveRoutes extends ContainsRoute {
330 public RemoveRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
331 super(routeIdentifiers);
335 public static class SetLocalRouter {
336 private final ActorRef router;
338 public SetLocalRouter(ActorRef router) {
339 Preconditions.checkArgument(router != null, "Router must not be null");
340 this.router = router;
343 public ActorRef getRouter() {
348 public String toString() {
349 return "SetLocalRouter{" +
355 public static class FindRouters {
356 private final RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier;
358 public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
359 Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
360 this.routeIdentifier = routeIdentifier;
363 public RpcRouter.RouteIdentifier<?, ?, ?> getRouteIdentifier() {
364 return routeIdentifier;
368 public String toString() {
369 return "FindRouters{" +
370 "routeIdentifier=" + routeIdentifier +
375 public static class FindRoutersReply {
376 final List<Pair<ActorRef, Long>> routerWithUpdateTime;
378 public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
379 Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
380 this.routerWithUpdateTime = routerWithUpdateTime;
383 public List<Pair<ActorRef, Long>> getRouterWithUpdateTime() {
384 return routerWithUpdateTime;
388 public String toString() {
389 return "FindRoutersReply{" +
390 "routerWithUpdateTime=" + routerWithUpdateTime +