Merge "Small fix to xsql dependencies"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Address;
12 import akka.actor.Props;
13 import akka.actor.UntypedActor;
14 import akka.dispatch.Mapper;
15 import akka.event.Logging;
16 import akka.event.LoggingAdapter;
17 import akka.japi.Option;
18 import akka.japi.Pair;
19 import akka.pattern.Patterns;
20 import com.google.common.base.Preconditions;
21 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
22 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
23 import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
24 import org.opendaylight.controller.sal.connector.api.RpcRouter;
25 import scala.concurrent.Future;
26
27 import java.util.ArrayList;
28 import java.util.Collections;
29 import java.util.List;
30 import java.util.Map;
31
32 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
33 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
34 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
35 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
36 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
37 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
38 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
39 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
40 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
41
42 /**
43  * Registry to look up cluster nodes that have registered for a given rpc.
44  * <p/>
45  * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
46  * cluster wide information.
47  */
48 public class RpcRegistry extends UntypedActor {
49
50     final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
51
52     /**
53      * Store to keep the registry. Bucket store sync's it across nodes in the cluster
54      */
55     private ActorRef bucketStore;
56
57     /**
58      * Rpc broker that would use the registry to route requests.
59      */
60     private ActorRef localRouter;
61
62     public RpcRegistry() {
63         bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store");
64
65         log.info("Bucket store path = {}", bucketStore.path().toString());
66     }
67
68     public RpcRegistry(ActorRef bucketStore) {
69         this.bucketStore = bucketStore;
70     }
71
72     @Override
73     public void onReceive(Object message) throws Exception {
74
75         log.debug("Received message: message [{}]", message);
76
77         //TODO: if sender is remote, reject message
78
79         if (message instanceof SetLocalRouter)
80             receiveSetLocalRouter((SetLocalRouter) message);
81
82         if (message instanceof AddOrUpdateRoutes)
83             receiveAddRoutes((AddOrUpdateRoutes) message);
84
85         else if (message instanceof RemoveRoutes)
86             receiveRemoveRoutes((RemoveRoutes) message);
87
88         else if (message instanceof Messages.FindRouters)
89             receiveGetRouter((FindRouters) message);
90
91         else
92             unhandled(message);
93     }
94
95     /**
96      * Register's rpc broker
97      *
98      * @param message contains {@link akka.actor.ActorRef} for rpc broker
99      */
100     private void receiveSetLocalRouter(SetLocalRouter message) {
101         localRouter = message.getRouter();
102     }
103
104     /**
105      * @param msg
106      */
107     private void receiveAddRoutes(AddOrUpdateRoutes msg) {
108
109         Preconditions.checkState(localRouter != null, "Router must be set first");
110
111         Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), ActorUtil.ASK_DURATION.toMillis());
112         futureReply.map(getMapperToAddRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
113     }
114
115     /**
116      * @param msg contains list of route ids to remove
117      */
118     private void receiveRemoveRoutes(RemoveRoutes msg) {
119
120         Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), ActorUtil.ASK_DURATION.toMillis());
121         futureReply.map(getMapperToRemoveRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
122
123     }
124
125     /**
126      * Finds routers for the given rpc.
127      *
128      * @param msg
129      */
130     private void receiveGetRouter(FindRouters msg) {
131         final ActorRef sender = getSender();
132
133         Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), ActorUtil.ASK_DURATION.toMillis());
134         futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher());
135     }
136
137     /**
138      * Helper to create empty reply when no routers are found
139      *
140      * @return
141      */
142     private Messages.FindRoutersReply createEmptyReply() {
143         List<Pair<ActorRef, Long>> routerWithUpdateTime = Collections.emptyList();
144         return new Messages.FindRoutersReply(routerWithUpdateTime);
145     }
146
147     /**
148      * Helper to create a reply when routers are found for the given rpc
149      *
150      * @param buckets
151      * @param routeId
152      * @return
153      */
154     private Messages.FindRoutersReply createReplyWithRouters(Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
155
156         List<Pair<ActorRef, Long>> routers = new ArrayList<>();
157         Option<Pair<ActorRef, Long>> routerWithUpdateTime = null;
158
159         for (Bucket bucket : buckets.values()) {
160
161             RoutingTable table = (RoutingTable) bucket.getData();
162             if (table == null)
163                 continue;
164
165             routerWithUpdateTime = table.getRouterFor(routeId);
166             if (routerWithUpdateTime.isEmpty())
167                 continue;
168
169             routers.add(routerWithUpdateTime.get());
170         }
171
172         return new Messages.FindRoutersReply(routers);
173     }
174
175
176     ///
177     ///private factories to create Mapper
178     ///
179
180     /**
181      * Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found
182      *
183      * @param routeId the rpc
184      * @param sender  client who asked to find the routers.
185      * @return
186      */
187     private Mapper<Object, Void> getMapperToGetRouter(final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
188         return new Mapper<Object, Void>() {
189             @Override
190             public Void apply(Object replyMessage) {
191
192                 if (replyMessage instanceof GetAllBucketsReply) {
193
194                     GetAllBucketsReply reply = (GetAllBucketsReply) replyMessage;
195                     Map<Address, Bucket> buckets = reply.getBuckets();
196
197                     if (buckets == null || buckets.isEmpty()) {
198                         sender.tell(createEmptyReply(), getSelf());
199                         return null;
200                     }
201
202                     sender.tell(createReplyWithRouters(buckets, routeId), getSelf());
203                 }
204                 return null;
205             }
206         };
207     }
208
209     /**
210      * Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently,
211      * it updates the local bucket in bucket store.
212      *
213      * @param routeIds rpc to remote
214      * @return
215      */
216     private Mapper<Object, Void> getMapperToRemoveRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
217         return new Mapper<Object, Void>() {
218             @Override
219             public Void apply(Object replyMessage) {
220                 if (replyMessage instanceof GetLocalBucketReply) {
221
222                     GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
223                     Bucket<RoutingTable> bucket = reply.getBucket();
224
225                     if (bucket == null) {
226                         log.debug("Local bucket is null");
227                         return null;
228                     }
229
230                     RoutingTable table = bucket.getData();
231                     if (table == null)
232                         table = new RoutingTable();
233
234                     table.setRouter(localRouter);
235
236                     if (!table.isEmpty()) {
237                         for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
238                             table.removeRoute(routeId);
239                         }
240                     }
241                     bucket.setData(table);
242
243                     UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
244                     bucketStore.tell(updateBucketMessage, getSelf());
245                 }
246                 return null;
247             }
248         };
249     }
250
251     /**
252      * Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently,
253      * it updates the local bucket in bucket store.
254      *
255      * @param routeIds rpc to add
256      * @return
257      */
258     private Mapper<Object, Void> getMapperToAddRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
259
260         return new Mapper<Object, Void>() {
261             @Override
262             public Void apply(Object replyMessage) {
263                 if (replyMessage instanceof GetLocalBucketReply) {
264
265                     GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
266                     Bucket<RoutingTable> bucket = reply.getBucket();
267
268                     if (bucket == null) {
269                         log.debug("Local bucket is null");
270                         return null;
271                     }
272
273                     RoutingTable table = bucket.getData();
274                     if (table == null)
275                         table = new RoutingTable();
276
277                     table.setRouter(localRouter);
278                     for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
279                         table.addRoute(routeId);
280                     }
281
282                     bucket.setData(table);
283
284                     UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
285                     bucketStore.tell(updateBucketMessage, getSelf());
286                 }
287
288                 return null;
289             }
290         };
291     }
292
293     /**
294      * All messages used by the RpcRegistry
295      */
296     public static class Messages {
297
298
299         public static class ContainsRoute {
300             final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
301
302             public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
303                 Preconditions.checkArgument(routeIdentifiers != null &&
304                                             !routeIdentifiers.isEmpty(),
305                                             "Route Identifiers must be supplied");
306                 this.routeIdentifiers = routeIdentifiers;
307             }
308
309             public List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
310                 return this.routeIdentifiers;
311             }
312
313             @Override
314             public String toString() {
315                 return "ContainsRoute{" +
316                         "routeIdentifiers=" + routeIdentifiers +
317                         '}';
318             }
319         }
320
321         public static class AddOrUpdateRoutes extends ContainsRoute {
322
323             public AddOrUpdateRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
324                 super(routeIdentifiers);
325             }
326         }
327
328         public static class RemoveRoutes extends ContainsRoute {
329
330             public RemoveRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
331                 super(routeIdentifiers);
332             }
333         }
334
335         public static class SetLocalRouter {
336             private final ActorRef router;
337
338             public SetLocalRouter(ActorRef router) {
339                 Preconditions.checkArgument(router != null, "Router must not be null");
340                 this.router = router;
341             }
342
343             public ActorRef getRouter() {
344                 return this.router;
345             }
346
347             @Override
348             public String toString() {
349                 return "SetLocalRouter{" +
350                         "router=" + router +
351                         '}';
352             }
353         }
354
355         public static class FindRouters {
356             private final RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier;
357
358             public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
359                 Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
360                 this.routeIdentifier = routeIdentifier;
361             }
362
363             public RpcRouter.RouteIdentifier<?, ?, ?> getRouteIdentifier() {
364                 return routeIdentifier;
365             }
366
367             @Override
368             public String toString() {
369                 return "FindRouters{" +
370                         "routeIdentifier=" + routeIdentifier +
371                         '}';
372             }
373         }
374
375         public static class FindRoutersReply {
376             final List<Pair<ActorRef, Long>> routerWithUpdateTime;
377
378             public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
379                 Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
380                 this.routerWithUpdateTime = routerWithUpdateTime;
381             }
382
383             public List<Pair<ActorRef, Long>> getRouterWithUpdateTime() {
384                 return routerWithUpdateTime;
385             }
386
387             @Override
388             public String toString() {
389                 return "FindRoutersReply{" +
390                         "routerWithUpdateTime=" + routerWithUpdateTime +
391                         '}';
392             }
393         }
394     }
395 }