Merge "Optimizations, Monitoring and Logging"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Address;
12 import akka.actor.Props;
13 import akka.actor.UntypedActor;
14 import akka.dispatch.Mapper;
15 import akka.event.Logging;
16 import akka.event.LoggingAdapter;
17 import akka.japi.Option;
18 import akka.japi.Pair;
19 import akka.pattern.Patterns;
20 import com.google.common.base.Preconditions;
21 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
22 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
23 import org.opendaylight.controller.sal.connector.api.RpcRouter;
24 import scala.concurrent.Future;
25
26 import java.util.ArrayList;
27 import java.util.Collections;
28 import java.util.List;
29 import java.util.Map;
30
31 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
32 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
33 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
34 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
35 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
36 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
37 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
38 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
39 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
40
41 /**
42  * Registry to look up cluster nodes that have registered for a given rpc.
43  * <p/>
44  * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
45  * cluster wide information.
46  */
47 public class RpcRegistry extends UntypedActor {
48
49     final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
50
51     /**
52      * Store to keep the registry. Bucket store sync's it across nodes in the cluster
53      */
54     private ActorRef bucketStore;
55
56     /**
57      * Rpc broker that would use the registry to route requests.
58      */
59     private ActorRef localRouter;
60
61     public RpcRegistry() {
62         bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store");
63     }
64
65     public RpcRegistry(ActorRef bucketStore) {
66         this.bucketStore = bucketStore;
67     }
68
69     @Override
70     public void onReceive(Object message) throws Exception {
71
72         log.debug("Received message: message [{}]", message);
73
74         //TODO: if sender is remote, reject message
75
76         if (message instanceof SetLocalRouter)
77             receiveSetLocalRouter((SetLocalRouter) message);
78
79         if (message instanceof AddOrUpdateRoutes)
80             receiveAddRoutes((AddOrUpdateRoutes) message);
81
82         else if (message instanceof RemoveRoutes)
83             receiveRemoveRoutes((RemoveRoutes) message);
84
85         else if (message instanceof Messages.FindRouters)
86             receiveGetRouter((FindRouters) message);
87
88         else
89             unhandled(message);
90     }
91
92     /**
93      * Register's rpc broker
94      *
95      * @param message contains {@link akka.actor.ActorRef} for rpc broker
96      */
97     private void receiveSetLocalRouter(SetLocalRouter message) {
98         localRouter = message.getRouter();
99     }
100
101     /**
102      * @param msg
103      */
104     private void receiveAddRoutes(AddOrUpdateRoutes msg) {
105
106         Preconditions.checkState(localRouter != null, "Router must be set first");
107
108         Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000);
109         futureReply.map(getMapperToAddRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
110     }
111
112     /**
113      * @param msg contains list of route ids to remove
114      */
115     private void receiveRemoveRoutes(RemoveRoutes msg) {
116
117         Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000);
118         futureReply.map(getMapperToRemoveRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
119
120     }
121
122     /**
123      * Finds routers for the given rpc.
124      *
125      * @param msg
126      */
127     private void receiveGetRouter(FindRouters msg) {
128         final ActorRef sender = getSender();
129
130         Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), 1000);
131         futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher());
132     }
133
134     /**
135      * Helper to create empty reply when no routers are found
136      *
137      * @return
138      */
139     private Messages.FindRoutersReply createEmptyReply() {
140         List<Pair<ActorRef, Long>> routerWithUpdateTime = Collections.emptyList();
141         return new Messages.FindRoutersReply(routerWithUpdateTime);
142     }
143
144     /**
145      * Helper to create a reply when routers are found for the given rpc
146      *
147      * @param buckets
148      * @param routeId
149      * @return
150      */
151     private Messages.FindRoutersReply createReplyWithRouters(Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
152
153         List<Pair<ActorRef, Long>> routers = new ArrayList<>();
154         Option<Pair<ActorRef, Long>> routerWithUpdateTime = null;
155
156         for (Bucket bucket : buckets.values()) {
157
158             RoutingTable table = (RoutingTable) bucket.getData();
159             if (table == null)
160                 continue;
161
162             routerWithUpdateTime = table.getRouterFor(routeId);
163             if (routerWithUpdateTime.isEmpty())
164                 continue;
165
166             routers.add(routerWithUpdateTime.get());
167         }
168
169         return new Messages.FindRoutersReply(routers);
170     }
171
172
173     ///
174     ///private factories to create Mapper
175     ///
176
177     /**
178      * Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found
179      *
180      * @param routeId the rpc
181      * @param sender  client who asked to find the routers.
182      * @return
183      */
184     private Mapper<Object, Void> getMapperToGetRouter(final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
185         return new Mapper<Object, Void>() {
186             @Override
187             public Void apply(Object replyMessage) {
188
189                 if (replyMessage instanceof GetAllBucketsReply) {
190
191                     GetAllBucketsReply reply = (GetAllBucketsReply) replyMessage;
192                     Map<Address, Bucket> buckets = reply.getBuckets();
193
194                     if (buckets == null || buckets.isEmpty()) {
195                         sender.tell(createEmptyReply(), getSelf());
196                         return null;
197                     }
198
199                     sender.tell(createReplyWithRouters(buckets, routeId), getSelf());
200                 }
201                 return null;
202             }
203         };
204     }
205
206     /**
207      * Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently,
208      * it updates the local bucket in bucket store.
209      *
210      * @param routeIds rpc to remote
211      * @return
212      */
213     private Mapper<Object, Void> getMapperToRemoveRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
214         return new Mapper<Object, Void>() {
215             @Override
216             public Void apply(Object replyMessage) {
217                 if (replyMessage instanceof GetLocalBucketReply) {
218
219                     GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
220                     Bucket<RoutingTable> bucket = reply.getBucket();
221
222                     if (bucket == null) {
223                         log.debug("Local bucket is null");
224                         return null;
225                     }
226
227                     RoutingTable table = bucket.getData();
228                     if (table == null)
229                         table = new RoutingTable();
230
231                     table.setRouter(localRouter);
232
233                     if (!table.isEmpty()) {
234                         for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
235                             table.removeRoute(routeId);
236                         }
237                     }
238                     bucket.setData(table);
239
240                     UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
241                     bucketStore.tell(updateBucketMessage, getSelf());
242                 }
243                 return null;
244             }
245         };
246     }
247
248     /**
249      * Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently,
250      * it updates the local bucket in bucket store.
251      *
252      * @param routeIds rpc to add
253      * @return
254      */
255     private Mapper<Object, Void> getMapperToAddRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
256
257         return new Mapper<Object, Void>() {
258             @Override
259             public Void apply(Object replyMessage) {
260                 if (replyMessage instanceof GetLocalBucketReply) {
261
262                     GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
263                     Bucket<RoutingTable> bucket = reply.getBucket();
264
265                     if (bucket == null) {
266                         log.debug("Local bucket is null");
267                         return null;
268                     }
269
270                     RoutingTable table = bucket.getData();
271                     if (table == null)
272                         table = new RoutingTable();
273
274                     table.setRouter(localRouter);
275                     for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
276                         table.addRoute(routeId);
277                     }
278
279                     bucket.setData(table);
280
281                     UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
282                     bucketStore.tell(updateBucketMessage, getSelf());
283                 }
284
285                 return null;
286             }
287         };
288     }
289
290     /**
291      * All messages used by the RpcRegistry
292      */
293     public static class Messages {
294
295
296         public static class ContainsRoute {
297             final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
298
299             public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
300                 Preconditions.checkArgument(routeIdentifiers != null &&
301                                             !routeIdentifiers.isEmpty(),
302                                             "Route Identifiers must be supplied");
303                 this.routeIdentifiers = routeIdentifiers;
304             }
305
306             public List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
307                 return this.routeIdentifiers;
308             }
309
310             @Override
311             public String toString() {
312                 return "ContainsRoute{" +
313                         "routeIdentifiers=" + routeIdentifiers +
314                         '}';
315             }
316         }
317
318         public static class AddOrUpdateRoutes extends ContainsRoute {
319
320             public AddOrUpdateRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
321                 super(routeIdentifiers);
322             }
323         }
324
325         public static class RemoveRoutes extends ContainsRoute {
326
327             public RemoveRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
328                 super(routeIdentifiers);
329             }
330         }
331
332         public static class SetLocalRouter {
333             private final ActorRef router;
334
335             public SetLocalRouter(ActorRef router) {
336                 Preconditions.checkArgument(router != null, "Router must not be null");
337                 this.router = router;
338             }
339
340             public ActorRef getRouter() {
341                 return this.router;
342             }
343
344             @Override
345             public String toString() {
346                 return "SetLocalRouter{" +
347                         "router=" + router +
348                         '}';
349             }
350         }
351
352         public static class FindRouters {
353             private final RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier;
354
355             public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
356                 Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
357                 this.routeIdentifier = routeIdentifier;
358             }
359
360             public RpcRouter.RouteIdentifier<?, ?, ?> getRouteIdentifier() {
361                 return routeIdentifier;
362             }
363
364             @Override
365             public String toString() {
366                 return "FindRouters{" +
367                         "routeIdentifier=" + routeIdentifier +
368                         '}';
369             }
370         }
371
372         public static class FindRoutersReply {
373             final List<Pair<ActorRef, Long>> routerWithUpdateTime;
374
375             public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
376                 Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
377                 this.routerWithUpdateTime = routerWithUpdateTime;
378             }
379
380             public List<Pair<ActorRef, Long>> getRouterWithUpdateTime() {
381                 return routerWithUpdateTime;
382             }
383
384             @Override
385             public String toString() {
386                 return "FindRoutersReply{" +
387                         "routerWithUpdateTime=" + routerWithUpdateTime +
388                         '}';
389             }
390         }
391     }
392 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.