2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.remote.rpc.registry;
10 import akka.actor.ActorRef;
11 import akka.actor.Address;
12 import akka.actor.Props;
13 import akka.actor.UntypedActor;
14 import akka.dispatch.Mapper;
15 import akka.event.Logging;
16 import akka.event.LoggingAdapter;
17 import akka.japi.Option;
18 import akka.japi.Pair;
19 import akka.pattern.Patterns;
20 import com.google.common.base.Preconditions;
21 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
22 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
23 import org.opendaylight.controller.sal.connector.api.RpcRouter;
24 import scala.concurrent.Future;
26 import java.util.ArrayList;
27 import java.util.Collections;
28 import java.util.List;
31 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
32 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
33 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
34 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
35 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
36 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
37 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
38 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
39 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
42 * Registry to look up cluster nodes that have registered for a given rpc.
44 * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
45 * cluster wide information.
47 public class RpcRegistry extends UntypedActor {
49 final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
52 * Store to keep the registry. Bucket store sync's it across nodes in the cluster
54 private ActorRef bucketStore;
57 * Rpc broker that would use the registry to route requests.
59 private ActorRef localRouter;
61 public RpcRegistry() {
62 bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store");
65 public RpcRegistry(ActorRef bucketStore) {
66 this.bucketStore = bucketStore;
70 public void onReceive(Object message) throws Exception {
72 log.debug("Received message: message [{}]", message);
74 //TODO: if sender is remote, reject message
76 if (message instanceof SetLocalRouter)
77 receiveSetLocalRouter((SetLocalRouter) message);
79 if (message instanceof AddOrUpdateRoutes)
80 receiveAddRoutes((AddOrUpdateRoutes) message);
82 else if (message instanceof RemoveRoutes)
83 receiveRemoveRoutes((RemoveRoutes) message);
85 else if (message instanceof Messages.FindRouters)
86 receiveGetRouter((FindRouters) message);
93 * Register's rpc broker
95 * @param message contains {@link akka.actor.ActorRef} for rpc broker
97 private void receiveSetLocalRouter(SetLocalRouter message) {
98 localRouter = message.getRouter();
104 private void receiveAddRoutes(AddOrUpdateRoutes msg) {
106 Preconditions.checkState(localRouter != null, "Router must be set first");
108 Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000);
109 futureReply.map(getMapperToAddRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
113 * @param msg contains list of route ids to remove
115 private void receiveRemoveRoutes(RemoveRoutes msg) {
117 Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000);
118 futureReply.map(getMapperToRemoveRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
123 * Finds routers for the given rpc.
127 private void receiveGetRouter(FindRouters msg) {
128 final ActorRef sender = getSender();
130 Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), 1000);
131 futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher());
135 * Helper to create empty reply when no routers are found
139 private Messages.FindRoutersReply createEmptyReply() {
140 List<Pair<ActorRef, Long>> routerWithUpdateTime = Collections.emptyList();
141 return new Messages.FindRoutersReply(routerWithUpdateTime);
145 * Helper to create a reply when routers are found for the given rpc
151 private Messages.FindRoutersReply createReplyWithRouters(Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
153 List<Pair<ActorRef, Long>> routers = new ArrayList<>();
154 Option<Pair<ActorRef, Long>> routerWithUpdateTime = null;
156 for (Bucket bucket : buckets.values()) {
158 RoutingTable table = (RoutingTable) bucket.getData();
162 routerWithUpdateTime = table.getRouterFor(routeId);
163 if (routerWithUpdateTime.isEmpty())
166 routers.add(routerWithUpdateTime.get());
169 return new Messages.FindRoutersReply(routers);
174 ///private factories to create Mapper
178 * Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found
180 * @param routeId the rpc
181 * @param sender client who asked to find the routers.
184 private Mapper<Object, Void> getMapperToGetRouter(final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
185 return new Mapper<Object, Void>() {
187 public Void apply(Object replyMessage) {
189 if (replyMessage instanceof GetAllBucketsReply) {
191 GetAllBucketsReply reply = (GetAllBucketsReply) replyMessage;
192 Map<Address, Bucket> buckets = reply.getBuckets();
194 if (buckets == null || buckets.isEmpty()) {
195 sender.tell(createEmptyReply(), getSelf());
199 sender.tell(createReplyWithRouters(buckets, routeId), getSelf());
207 * Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently,
208 * it updates the local bucket in bucket store.
210 * @param routeIds rpc to remote
213 private Mapper<Object, Void> getMapperToRemoveRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
214 return new Mapper<Object, Void>() {
216 public Void apply(Object replyMessage) {
217 if (replyMessage instanceof GetLocalBucketReply) {
219 GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
220 Bucket<RoutingTable> bucket = reply.getBucket();
222 if (bucket == null) {
223 log.debug("Local bucket is null");
227 RoutingTable table = bucket.getData();
229 table = new RoutingTable();
231 table.setRouter(localRouter);
233 if (!table.isEmpty()) {
234 for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
235 table.removeRoute(routeId);
238 bucket.setData(table);
240 UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
241 bucketStore.tell(updateBucketMessage, getSelf());
249 * Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently,
250 * it updates the local bucket in bucket store.
252 * @param routeIds rpc to add
255 private Mapper<Object, Void> getMapperToAddRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
257 return new Mapper<Object, Void>() {
259 public Void apply(Object replyMessage) {
260 if (replyMessage instanceof GetLocalBucketReply) {
262 GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
263 Bucket<RoutingTable> bucket = reply.getBucket();
265 if (bucket == null) {
266 log.debug("Local bucket is null");
270 RoutingTable table = bucket.getData();
272 table = new RoutingTable();
274 table.setRouter(localRouter);
275 for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
276 table.addRoute(routeId);
279 bucket.setData(table);
281 UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
282 bucketStore.tell(updateBucketMessage, getSelf());
291 * All messages used by the RpcRegistry
293 public static class Messages {
296 public static class ContainsRoute {
297 final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
299 public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
300 Preconditions.checkArgument(routeIdentifiers != null &&
301 !routeIdentifiers.isEmpty(),
302 "Route Identifiers must be supplied");
303 this.routeIdentifiers = routeIdentifiers;
306 public List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
307 return this.routeIdentifiers;
311 public String toString() {
312 return "ContainsRoute{" +
313 "routeIdentifiers=" + routeIdentifiers +
318 public static class AddOrUpdateRoutes extends ContainsRoute {
320 public AddOrUpdateRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
321 super(routeIdentifiers);
325 public static class RemoveRoutes extends ContainsRoute {
327 public RemoveRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
328 super(routeIdentifiers);
332 public static class SetLocalRouter {
333 private final ActorRef router;
335 public SetLocalRouter(ActorRef router) {
336 Preconditions.checkArgument(router != null, "Router must not be null");
337 this.router = router;
340 public ActorRef getRouter() {
345 public String toString() {
346 return "SetLocalRouter{" +
352 public static class FindRouters {
353 private final RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier;
355 public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
356 Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
357 this.routeIdentifier = routeIdentifier;
360 public RpcRouter.RouteIdentifier<?, ?, ?> getRouteIdentifier() {
361 return routeIdentifier;
365 public String toString() {
366 return "FindRouters{" +
367 "routeIdentifier=" + routeIdentifier +
372 public static class FindRoutersReply {
373 final List<Pair<ActorRef, Long>> routerWithUpdateTime;
375 public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
376 Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
377 this.routerWithUpdateTime = routerWithUpdateTime;
380 public List<Pair<ActorRef, Long>> getRouterWithUpdateTime() {
381 return routerWithUpdateTime;
385 public String toString() {
386 return "FindRoutersReply{" +
387 "routerWithUpdateTime=" + routerWithUpdateTime +