Merge "BUG 1582 - PUT fails validate outer object name"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Address;
12 import akka.actor.Props;
13 import akka.dispatch.Mapper;
14 import akka.event.Logging;
15 import akka.event.LoggingAdapter;
16 import akka.japi.Option;
17 import akka.japi.Pair;
18 import akka.pattern.Patterns;
19 import com.google.common.base.Preconditions;
20 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
21 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
22 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
23 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
24 import org.opendaylight.controller.sal.connector.api.RpcRouter;
25 import scala.concurrent.Future;
26
27 import java.util.ArrayList;
28 import java.util.Collections;
29 import java.util.List;
30 import java.util.Map;
31
32 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
33 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
34 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
35 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
36 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
37 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
38 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
39 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
40 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
41
42 /**
43  * Registry to look up cluster nodes that have registered for a given rpc.
44  * <p/>
45  * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
46  * cluster wide information.
47  */
48 public class RpcRegistry extends AbstractUntypedActorWithMetering {
49
50     final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
51
52     /**
53      * Store to keep the registry. Bucket store sync's it across nodes in the cluster
54      */
55     private ActorRef bucketStore;
56
57     /**
58      * Rpc broker that would use the registry to route requests.
59      */
60     private ActorRef localRouter;
61
62     private RemoteRpcProviderConfig config;
63
64     public RpcRegistry() {
65         bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store");
66         this.config = new RemoteRpcProviderConfig(getContext().system().settings().config());
67         log.info("Bucket store path = {}", bucketStore.path().toString());
68     }
69
70     public RpcRegistry(ActorRef bucketStore) {
71         this.bucketStore = bucketStore;
72     }
73
74
75     @Override
76     protected void handleReceive(Object message) throws Exception {
77         //TODO: if sender is remote, reject message
78
79         if (message instanceof SetLocalRouter)
80             receiveSetLocalRouter((SetLocalRouter) message);
81
82         if (message instanceof AddOrUpdateRoutes)
83             receiveAddRoutes((AddOrUpdateRoutes) message);
84
85         else if (message instanceof RemoveRoutes)
86             receiveRemoveRoutes((RemoveRoutes) message);
87
88         else if (message instanceof Messages.FindRouters)
89             receiveGetRouter((FindRouters) message);
90
91         else
92             unhandled(message);
93     }
94
95     /**
96      * Register's rpc broker
97      *
98      * @param message contains {@link akka.actor.ActorRef} for rpc broker
99      */
100     private void receiveSetLocalRouter(SetLocalRouter message) {
101         localRouter = message.getRouter();
102     }
103
104     /**
105      * @param msg
106      */
107     private void receiveAddRoutes(AddOrUpdateRoutes msg) {
108
109         Preconditions.checkState(localRouter != null, "Router must be set first");
110
111         Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration());
112         futureReply.map(getMapperToAddRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
113     }
114
115     /**
116      * @param msg contains list of route ids to remove
117      */
118     private void receiveRemoveRoutes(RemoveRoutes msg) {
119
120         Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration());
121         futureReply.map(getMapperToRemoveRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
122
123     }
124
125     /**
126      * Finds routers for the given rpc.
127      *
128      * @param msg
129      */
130     private void receiveGetRouter(FindRouters msg) {
131         final ActorRef sender = getSender();
132
133         Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), config.getAskDuration());
134         futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher());
135     }
136
137     /**
138      * Helper to create empty reply when no routers are found
139      *
140      * @return
141      */
142     private Messages.FindRoutersReply createEmptyReply() {
143         List<Pair<ActorRef, Long>> routerWithUpdateTime = Collections.emptyList();
144         return new Messages.FindRoutersReply(routerWithUpdateTime);
145     }
146
147     /**
148      * Helper to create a reply when routers are found for the given rpc
149      *
150      * @param buckets
151      * @param routeId
152      * @return
153      */
154     private Messages.FindRoutersReply createReplyWithRouters(
155             Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
156
157         List<Pair<ActorRef, Long>> routers = new ArrayList<>();
158         Option<Pair<ActorRef, Long>> routerWithUpdateTime = null;
159
160         for (Bucket bucket : buckets.values()) {
161
162             RoutingTable table = (RoutingTable) bucket.getData();
163             if (table == null)
164                 continue;
165
166             routerWithUpdateTime = table.getRouterFor(routeId);
167             if (routerWithUpdateTime.isEmpty())
168                 continue;
169
170             routers.add(routerWithUpdateTime.get());
171         }
172
173         return new Messages.FindRoutersReply(routers);
174     }
175
176
177     ///
178     ///private factories to create Mapper
179     ///
180
181     /**
182      * Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found
183      *
184      * @param routeId the rpc
185      * @param sender  client who asked to find the routers.
186      * @return
187      */
188     private Mapper<Object, Void> getMapperToGetRouter(
189             final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
190         return new Mapper<Object, Void>() {
191             @Override
192             public Void apply(Object replyMessage) {
193
194                 if (replyMessage instanceof GetAllBucketsReply) {
195
196                     GetAllBucketsReply reply = (GetAllBucketsReply) replyMessage;
197                     Map<Address, Bucket> buckets = reply.getBuckets();
198
199                     if (buckets == null || buckets.isEmpty()) {
200                         sender.tell(createEmptyReply(), getSelf());
201                         return null;
202                     }
203
204                     sender.tell(createReplyWithRouters(buckets, routeId), getSelf());
205                 }
206                 return null;
207             }
208         };
209     }
210
211     /**
212      * Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently,
213      * it updates the local bucket in bucket store.
214      *
215      * @param routeIds rpc to remote
216      * @return
217      */
218     private Mapper<Object, Void> getMapperToRemoveRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
219         return new Mapper<Object, Void>() {
220             @Override
221             public Void apply(Object replyMessage) {
222                 if (replyMessage instanceof GetLocalBucketReply) {
223
224                     GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
225                     Bucket<RoutingTable> bucket = reply.getBucket();
226
227                     if (bucket == null) {
228                         log.debug("Local bucket is null");
229                         return null;
230                     }
231
232                     RoutingTable table = bucket.getData();
233                     if (table == null)
234                         table = new RoutingTable();
235
236                     table.setRouter(localRouter);
237
238                     if (!table.isEmpty()) {
239                         for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
240                             table.removeRoute(routeId);
241                         }
242                     }
243                     bucket.setData(table);
244
245                     UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
246                     bucketStore.tell(updateBucketMessage, getSelf());
247                 }
248                 return null;
249             }
250         };
251     }
252
253     /**
254      * Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently,
255      * it updates the local bucket in bucket store.
256      *
257      * @param routeIds rpc to add
258      * @return
259      */
260     private Mapper<Object, Void> getMapperToAddRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
261
262         return new Mapper<Object, Void>() {
263             @Override
264             public Void apply(Object replyMessage) {
265                 if (replyMessage instanceof GetLocalBucketReply) {
266
267                     GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
268                     Bucket<RoutingTable> bucket = reply.getBucket();
269
270                     if (bucket == null) {
271                         log.debug("Local bucket is null");
272                         return null;
273                     }
274
275                     RoutingTable table = bucket.getData();
276                     if (table == null)
277                         table = new RoutingTable();
278
279                     table.setRouter(localRouter);
280                     for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
281                         table.addRoute(routeId);
282                     }
283
284                     bucket.setData(table);
285
286                     UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
287                     bucketStore.tell(updateBucketMessage, getSelf());
288                 }
289
290                 return null;
291             }
292         };
293     }
294
295     /**
296      * All messages used by the RpcRegistry
297      */
298     public static class Messages {
299
300
301         public static class ContainsRoute {
302             final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
303
304             public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
305                 Preconditions.checkArgument(routeIdentifiers != null &&
306                                             !routeIdentifiers.isEmpty(),
307                                             "Route Identifiers must be supplied");
308                 this.routeIdentifiers = routeIdentifiers;
309             }
310
311             public List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
312                 return this.routeIdentifiers;
313             }
314
315             @Override
316             public String toString() {
317                 return "ContainsRoute{" +
318                         "routeIdentifiers=" + routeIdentifiers +
319                         '}';
320             }
321         }
322
323         public static class AddOrUpdateRoutes extends ContainsRoute {
324
325             public AddOrUpdateRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
326                 super(routeIdentifiers);
327             }
328         }
329
330         public static class RemoveRoutes extends ContainsRoute {
331
332             public RemoveRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
333                 super(routeIdentifiers);
334             }
335         }
336
337         public static class SetLocalRouter {
338             private final ActorRef router;
339
340             public SetLocalRouter(ActorRef router) {
341                 Preconditions.checkArgument(router != null, "Router must not be null");
342                 this.router = router;
343             }
344
345             public ActorRef getRouter() {
346                 return this.router;
347             }
348
349             @Override
350             public String toString() {
351                 return "SetLocalRouter{" +
352                         "router=" + router +
353                         '}';
354             }
355         }
356
357         public static class FindRouters {
358             private final RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier;
359
360             public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
361                 Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
362                 this.routeIdentifier = routeIdentifier;
363             }
364
365             public RpcRouter.RouteIdentifier<?, ?, ?> getRouteIdentifier() {
366                 return routeIdentifier;
367             }
368
369             @Override
370             public String toString() {
371                 return "FindRouters{" +
372                         "routeIdentifier=" + routeIdentifier +
373                         '}';
374             }
375         }
376
377         public static class FindRoutersReply {
378             final List<Pair<ActorRef, Long>> routerWithUpdateTime;
379
380             public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
381                 Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
382                 this.routerWithUpdateTime = routerWithUpdateTime;
383             }
384
385             public List<Pair<ActorRef, Long>> getRouterWithUpdateTime() {
386                 return routerWithUpdateTime;
387             }
388
389             @Override
390             public String toString() {
391                 return "FindRoutersReply{" +
392                         "routerWithUpdateTime=" + routerWithUpdateTime +
393                         '}';
394             }
395         }
396     }
397 }