2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.remote.rpc.registry;
10 import akka.actor.ActorRef;
11 import akka.actor.Address;
12 import akka.actor.Props;
13 import akka.actor.UntypedActor;
14 import akka.dispatch.Mapper;
15 import akka.event.Logging;
16 import akka.event.LoggingAdapter;
17 import akka.japi.Option;
18 import akka.japi.Pair;
19 import akka.pattern.Patterns;
20 import com.google.common.base.Preconditions;
21 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
22 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
23 import org.opendaylight.controller.sal.connector.api.RpcRouter;
24 import scala.concurrent.Future;
26 import java.util.ArrayList;
27 import java.util.Collections;
28 import java.util.List;
31 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoute;
32 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoute;
33 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
34 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
35 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
36 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
37 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
38 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
41 * Registry to look up cluster nodes that have registered for a given rpc.
43 * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
44 * cluster wide information.
47 public class RpcRegistry extends UntypedActor {
49 final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
52 * Store to keep the registry. Bucket store sync's it across nodes in the cluster
54 private ActorRef bucketStore;
57 * Rpc broker that would use the registry to route requests.
59 private ActorRef localRouter;
61 public RpcRegistry() {
62 bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store");
65 public RpcRegistry(ActorRef bucketStore) {
66 this.bucketStore = bucketStore;
70 public void onReceive(Object message) throws Exception {
72 log.debug("Received message: message [{}]", message);
74 //TODO: if sender is remote, reject message
76 if (message instanceof SetLocalRouter)
77 receiveSetLocalRouter((SetLocalRouter) message);
79 if (message instanceof AddOrUpdateRoute)
80 receiveAddRoute((AddOrUpdateRoute) message);
82 else if (message instanceof RemoveRoute)
83 receiveRemoveRoute((RemoveRoute) message);
85 else if (message instanceof Messages.FindRouters)
86 receiveGetRouter((Messages.FindRouters) message);
93 * Register's rpc broker
95 * @param message contains {@link akka.actor.ActorRef} for rpc broker
97 private void receiveSetLocalRouter(SetLocalRouter message) {
98 if (message == null || message.getRouter() == null)
101 localRouter = message.getRouter();
105 * //TODO: update this to accept multiple route registration
108 private void receiveAddRoute(AddOrUpdateRoute msg) {
109 if (msg.getRouteIdentifier() == null)
112 Preconditions.checkState(localRouter != null, "Router must be set first");
114 Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000);
115 futureReply.map(getMapperToAddRoute(msg.getRouteIdentifier()), getContext().dispatcher());
119 * //TODO: update this to accept multiple routes
122 private void receiveRemoveRoute(RemoveRoute msg) {
123 if (msg.getRouteIdentifier() == null)
126 Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000);
127 futureReply.map(getMapperToRemoveRoute(msg.getRouteIdentifier()), getContext().dispatcher());
132 * Finds routers for the given rpc.
135 private void receiveGetRouter(Messages.FindRouters msg) {
136 final ActorRef sender = getSender();
138 //if empty message, return empty list
139 if (msg.getRouteIdentifier() == null) {
140 sender.tell(createEmptyReply(), getSelf());
144 Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), 1000);
145 futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher());
150 * Helper to create empty reply when no routers are found
154 private Messages.FindRoutersReply createEmptyReply() {
155 List<Pair<ActorRef, Long>> routerWithUpdateTime = Collections.emptyList();
156 return new Messages.FindRoutersReply(routerWithUpdateTime);
160 * Helper to create a reply when routers are found for the given rpc
165 private Messages.FindRoutersReply createReplyWithRouters(Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
167 List<Pair<ActorRef, Long>> routers = new ArrayList<>();
169 Option<Pair<ActorRef, Long>> routerWithUpdateTime = null;
171 for (Bucket bucket : buckets.values()) {
173 RoutingTable table = (RoutingTable) bucket.getData();
178 routerWithUpdateTime = table.getRouterFor(routeId);
180 if (routerWithUpdateTime.isEmpty())
183 routers.add(routerWithUpdateTime.get());
186 return new Messages.FindRoutersReply(routers);
191 ///private factories to create Mapper
195 * Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found
197 * @param routeId the rpc
198 * @param sender client who asked to find the routers.
201 private Mapper<Object, Void> getMapperToGetRouter(final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
202 return new Mapper<Object, Void>() {
204 public Void apply(Object replyMessage) {
206 if (replyMessage instanceof GetAllBucketsReply) {
208 GetAllBucketsReply reply = (GetAllBucketsReply) replyMessage;
209 Map<Address, Bucket> buckets = reply.getBuckets();
211 if (buckets == null || buckets.isEmpty()) {
212 sender.tell(createEmptyReply(), getSelf());
216 sender.tell(createReplyWithRouters(buckets, routeId), getSelf());
224 * Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently,
225 * it updates the local bucket in bucket store.
227 * @param routeId rpc to remote
230 private Mapper<Object, Void> getMapperToRemoveRoute(final RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
231 return new Mapper<Object, Void>() {
233 public Void apply(Object replyMessage) {
234 if (replyMessage instanceof GetLocalBucketReply) {
236 GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
237 Bucket<RoutingTable> bucket = reply.getBucket();
239 if (bucket == null) {
240 log.debug("Local bucket is null");
244 RoutingTable table = bucket.getData();
246 table = new RoutingTable();
248 table.setRouter(localRouter);
249 table.removeRoute(routeId);
251 bucket.setData(table);
253 UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
254 bucketStore.tell(updateBucketMessage, getSelf());
262 * Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently,
263 * it updates the local bucket in bucket store.
265 * @param routeId rpc to add
268 private Mapper<Object, Void> getMapperToAddRoute(final RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
270 return new Mapper<Object, Void>() {
272 public Void apply(Object replyMessage) {
273 if (replyMessage instanceof GetLocalBucketReply) {
275 GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
276 Bucket<RoutingTable> bucket = reply.getBucket();
278 if (bucket == null) {
279 log.debug("Local bucket is null");
283 RoutingTable table = bucket.getData();
285 table = new RoutingTable();
287 table.setRouter(localRouter);
288 table.addRoute(routeId);
290 bucket.setData(table);
292 UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
293 bucketStore.tell(updateBucketMessage, getSelf());
302 * All messages used by the RpcRegistry
304 public static class Messages {
307 public static class ContainsRoute {
308 final RpcRouter.RouteIdentifier<?,?,?> routeIdentifier;
310 public ContainsRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
311 Preconditions.checkArgument(routeIdentifier != null);
312 this.routeIdentifier = routeIdentifier;
315 public RpcRouter.RouteIdentifier<?,?,?> getRouteIdentifier(){
316 return this.routeIdentifier;
320 public String toString() {
321 return this.getClass().getSimpleName() + "{" +
322 "routeIdentifier=" + routeIdentifier +
327 public static class AddOrUpdateRoute extends ContainsRoute{
329 public AddOrUpdateRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
330 super(routeIdentifier);
334 public static class RemoveRoute extends ContainsRoute {
336 public RemoveRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
337 super(routeIdentifier);
341 public static class SetLocalRouter{
342 private final ActorRef router;
344 public SetLocalRouter(ActorRef router) {
345 this.router = router;
348 public ActorRef getRouter(){
353 public String toString() {
354 return "SetLocalRouter{" +
360 public static class FindRouters extends ContainsRoute {
361 public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
362 super(routeIdentifier);
366 public static class FindRoutersReply {
367 final List<Pair<ActorRef, Long>> routerWithUpdateTime;
369 public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
370 this.routerWithUpdateTime = routerWithUpdateTime;
373 public List<Pair<ActorRef, Long>> getRouterWithUpdateTime(){
374 return routerWithUpdateTime;
378 public String toString() {
379 return "FindRoutersReply{" +
380 "routerWithUpdateTime=" + routerWithUpdateTime +