Merge "Patch to eliminate sleeps in RemoteRpc tests"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Address;
12 import akka.actor.Props;
13 import akka.actor.UntypedActor;
14 import akka.dispatch.Mapper;
15 import akka.event.Logging;
16 import akka.event.LoggingAdapter;
17 import akka.japi.Option;
18 import akka.japi.Pair;
19 import akka.pattern.Patterns;
20 import com.google.common.base.Preconditions;
21 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
22 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
23 import org.opendaylight.controller.sal.connector.api.RpcRouter;
24 import scala.concurrent.Future;
25
26 import java.util.ArrayList;
27 import java.util.Collections;
28 import java.util.List;
29 import java.util.Map;
30
31 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
32 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
33 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
34 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
35 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
36 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
37 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
38 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
39 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
40
41 /**
42  * Registry to look up cluster nodes that have registered for a given rpc.
43  * <p/>
44  * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
45  * cluster wide information.
46  */
47 public class RpcRegistry extends UntypedActor {
48
49     final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
50
51     /**
52      * Store to keep the registry. Bucket store sync's it across nodes in the cluster
53      */
54     private ActorRef bucketStore;
55
56     /**
57      * Rpc broker that would use the registry to route requests.
58      */
59     private ActorRef localRouter;
60
61     public RpcRegistry() {
62         bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store");
63
64         log.info("Bucket store path = {}", bucketStore.path().toString());
65     }
66
67     public RpcRegistry(ActorRef bucketStore) {
68         this.bucketStore = bucketStore;
69     }
70
71     @Override
72     public void onReceive(Object message) throws Exception {
73
74         log.debug("Received message: message [{}]", message);
75
76         //TODO: if sender is remote, reject message
77
78         if (message instanceof SetLocalRouter)
79             receiveSetLocalRouter((SetLocalRouter) message);
80
81         if (message instanceof AddOrUpdateRoutes)
82             receiveAddRoutes((AddOrUpdateRoutes) message);
83
84         else if (message instanceof RemoveRoutes)
85             receiveRemoveRoutes((RemoveRoutes) message);
86
87         else if (message instanceof Messages.FindRouters)
88             receiveGetRouter((FindRouters) message);
89
90         else
91             unhandled(message);
92     }
93
94     /**
95      * Register's rpc broker
96      *
97      * @param message contains {@link akka.actor.ActorRef} for rpc broker
98      */
99     private void receiveSetLocalRouter(SetLocalRouter message) {
100         localRouter = message.getRouter();
101     }
102
103     /**
104      * @param msg
105      */
106     private void receiveAddRoutes(AddOrUpdateRoutes msg) {
107
108         Preconditions.checkState(localRouter != null, "Router must be set first");
109
110         Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000);
111         futureReply.map(getMapperToAddRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
112     }
113
114     /**
115      * @param msg contains list of route ids to remove
116      */
117     private void receiveRemoveRoutes(RemoveRoutes msg) {
118
119         Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000);
120         futureReply.map(getMapperToRemoveRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
121
122     }
123
124     /**
125      * Finds routers for the given rpc.
126      *
127      * @param msg
128      */
129     private void receiveGetRouter(FindRouters msg) {
130         final ActorRef sender = getSender();
131
132         Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), 1000);
133         futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher());
134     }
135
136     /**
137      * Helper to create empty reply when no routers are found
138      *
139      * @return
140      */
141     private Messages.FindRoutersReply createEmptyReply() {
142         List<Pair<ActorRef, Long>> routerWithUpdateTime = Collections.emptyList();
143         return new Messages.FindRoutersReply(routerWithUpdateTime);
144     }
145
146     /**
147      * Helper to create a reply when routers are found for the given rpc
148      *
149      * @param buckets
150      * @param routeId
151      * @return
152      */
153     private Messages.FindRoutersReply createReplyWithRouters(Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
154
155         List<Pair<ActorRef, Long>> routers = new ArrayList<>();
156         Option<Pair<ActorRef, Long>> routerWithUpdateTime = null;
157
158         for (Bucket bucket : buckets.values()) {
159
160             RoutingTable table = (RoutingTable) bucket.getData();
161             if (table == null)
162                 continue;
163
164             routerWithUpdateTime = table.getRouterFor(routeId);
165             if (routerWithUpdateTime.isEmpty())
166                 continue;
167
168             routers.add(routerWithUpdateTime.get());
169         }
170
171         return new Messages.FindRoutersReply(routers);
172     }
173
174
175     ///
176     ///private factories to create Mapper
177     ///
178
179     /**
180      * Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found
181      *
182      * @param routeId the rpc
183      * @param sender  client who asked to find the routers.
184      * @return
185      */
186     private Mapper<Object, Void> getMapperToGetRouter(final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
187         return new Mapper<Object, Void>() {
188             @Override
189             public Void apply(Object replyMessage) {
190
191                 if (replyMessage instanceof GetAllBucketsReply) {
192
193                     GetAllBucketsReply reply = (GetAllBucketsReply) replyMessage;
194                     Map<Address, Bucket> buckets = reply.getBuckets();
195
196                     if (buckets == null || buckets.isEmpty()) {
197                         sender.tell(createEmptyReply(), getSelf());
198                         return null;
199                     }
200
201                     sender.tell(createReplyWithRouters(buckets, routeId), getSelf());
202                 }
203                 return null;
204             }
205         };
206     }
207
208     /**
209      * Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently,
210      * it updates the local bucket in bucket store.
211      *
212      * @param routeIds rpc to remote
213      * @return
214      */
215     private Mapper<Object, Void> getMapperToRemoveRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
216         return new Mapper<Object, Void>() {
217             @Override
218             public Void apply(Object replyMessage) {
219                 if (replyMessage instanceof GetLocalBucketReply) {
220
221                     GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
222                     Bucket<RoutingTable> bucket = reply.getBucket();
223
224                     if (bucket == null) {
225                         log.debug("Local bucket is null");
226                         return null;
227                     }
228
229                     RoutingTable table = bucket.getData();
230                     if (table == null)
231                         table = new RoutingTable();
232
233                     table.setRouter(localRouter);
234
235                     if (!table.isEmpty()) {
236                         for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
237                             table.removeRoute(routeId);
238                         }
239                     }
240                     bucket.setData(table);
241
242                     UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
243                     bucketStore.tell(updateBucketMessage, getSelf());
244                 }
245                 return null;
246             }
247         };
248     }
249
250     /**
251      * Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently,
252      * it updates the local bucket in bucket store.
253      *
254      * @param routeIds rpc to add
255      * @return
256      */
257     private Mapper<Object, Void> getMapperToAddRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
258
259         return new Mapper<Object, Void>() {
260             @Override
261             public Void apply(Object replyMessage) {
262                 if (replyMessage instanceof GetLocalBucketReply) {
263
264                     GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
265                     Bucket<RoutingTable> bucket = reply.getBucket();
266
267                     if (bucket == null) {
268                         log.debug("Local bucket is null");
269                         return null;
270                     }
271
272                     RoutingTable table = bucket.getData();
273                     if (table == null)
274                         table = new RoutingTable();
275
276                     table.setRouter(localRouter);
277                     for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
278                         table.addRoute(routeId);
279                     }
280
281                     bucket.setData(table);
282
283                     UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
284                     bucketStore.tell(updateBucketMessage, getSelf());
285                 }
286
287                 return null;
288             }
289         };
290     }
291
292     /**
293      * All messages used by the RpcRegistry
294      */
295     public static class Messages {
296
297
298         public static class ContainsRoute {
299             final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
300
301             public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
302                 Preconditions.checkArgument(routeIdentifiers != null &&
303                                             !routeIdentifiers.isEmpty(),
304                                             "Route Identifiers must be supplied");
305                 this.routeIdentifiers = routeIdentifiers;
306             }
307
308             public List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
309                 return this.routeIdentifiers;
310             }
311
312             @Override
313             public String toString() {
314                 return "ContainsRoute{" +
315                         "routeIdentifiers=" + routeIdentifiers +
316                         '}';
317             }
318         }
319
320         public static class AddOrUpdateRoutes extends ContainsRoute {
321
322             public AddOrUpdateRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
323                 super(routeIdentifiers);
324             }
325         }
326
327         public static class RemoveRoutes extends ContainsRoute {
328
329             public RemoveRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
330                 super(routeIdentifiers);
331             }
332         }
333
334         public static class SetLocalRouter {
335             private final ActorRef router;
336
337             public SetLocalRouter(ActorRef router) {
338                 Preconditions.checkArgument(router != null, "Router must not be null");
339                 this.router = router;
340             }
341
342             public ActorRef getRouter() {
343                 return this.router;
344             }
345
346             @Override
347             public String toString() {
348                 return "SetLocalRouter{" +
349                         "router=" + router +
350                         '}';
351             }
352         }
353
354         public static class FindRouters {
355             private final RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier;
356
357             public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
358                 Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
359                 this.routeIdentifier = routeIdentifier;
360             }
361
362             public RpcRouter.RouteIdentifier<?, ?, ?> getRouteIdentifier() {
363                 return routeIdentifier;
364             }
365
366             @Override
367             public String toString() {
368                 return "FindRouters{" +
369                         "routeIdentifier=" + routeIdentifier +
370                         '}';
371             }
372         }
373
374         public static class FindRoutersReply {
375             final List<Pair<ActorRef, Long>> routerWithUpdateTime;
376
377             public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
378                 Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
379                 this.routerWithUpdateTime = routerWithUpdateTime;
380             }
381
382             public List<Pair<ActorRef, Long>> getRouterWithUpdateTime() {
383                 return routerWithUpdateTime;
384             }
385
386             @Override
387             public String toString() {
388                 return "FindRoutersReply{" +
389                         "routerWithUpdateTime=" + routerWithUpdateTime +
390                         '}';
391             }
392         }
393     }
394 }