Merge "Bug 1430: Off-load notifications from single commit thread"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Address;
12 import akka.actor.Props;
13 import akka.actor.UntypedActor;
14 import akka.dispatch.Mapper;
15 import akka.event.Logging;
16 import akka.event.LoggingAdapter;
17 import akka.japi.Option;
18 import akka.japi.Pair;
19 import akka.pattern.Patterns;
20 import com.google.common.base.Preconditions;
21 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
22 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
23 import org.opendaylight.controller.sal.connector.api.RpcRouter;
24 import scala.concurrent.Future;
25
26 import java.util.ArrayList;
27 import java.util.Collections;
28 import java.util.List;
29 import java.util.Map;
30
31 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoute;
32 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoute;
33 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
34 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
35 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
36 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
37 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
38 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
39
40 /**
41  * Registry to look up cluster nodes that have registered for a given rpc.
42  * <p>
43  * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
44  * cluster wide information.
45  *
46  */
47 public class RpcRegistry extends UntypedActor {
48
49     final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
50
51     /**
52      * Store to keep the registry. Bucket store sync's it across nodes in the cluster
53      */
54     private ActorRef bucketStore;
55
56     /**
57      * Rpc broker that would use the registry to route requests.
58      */
59     private ActorRef localRouter;
60
61     public RpcRegistry() {
62         bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store");
63     }
64
65     public RpcRegistry(ActorRef bucketStore) {
66         this.bucketStore = bucketStore;
67     }
68
69     @Override
70     public void onReceive(Object message) throws Exception {
71
72         log.debug("Received message: message [{}]", message);
73
74         //TODO: if sender is remote, reject message
75
76         if (message instanceof SetLocalRouter)
77             receiveSetLocalRouter((SetLocalRouter) message);
78
79         if (message instanceof AddOrUpdateRoute)
80             receiveAddRoute((AddOrUpdateRoute) message);
81
82         else if (message instanceof RemoveRoute)
83             receiveRemoveRoute((RemoveRoute) message);
84
85         else if (message instanceof Messages.FindRouters)
86             receiveGetRouter((Messages.FindRouters) message);
87
88         else
89             unhandled(message);
90     }
91
92     /**
93      * Register's rpc broker
94      *
95      * @param message contains {@link akka.actor.ActorRef} for rpc broker
96      */
97     private void receiveSetLocalRouter(SetLocalRouter message) {
98         if (message == null || message.getRouter() == null)
99             return;//ignore
100
101         localRouter = message.getRouter();
102     }
103
104     /**
105      * //TODO: update this to accept multiple route registration
106      * @param msg
107      */
108     private void receiveAddRoute(AddOrUpdateRoute msg) {
109         if (msg.getRouteIdentifier() == null)
110             return;//ignore
111
112         Preconditions.checkState(localRouter != null, "Router must be set first");
113
114         Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000);
115         futureReply.map(getMapperToAddRoute(msg.getRouteIdentifier()), getContext().dispatcher());
116     }
117
118     /**
119      * //TODO: update this to accept multiple routes
120      * @param msg
121      */
122     private void receiveRemoveRoute(RemoveRoute msg) {
123         if (msg.getRouteIdentifier() == null)
124             return;//ignore
125
126         Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000);
127         futureReply.map(getMapperToRemoveRoute(msg.getRouteIdentifier()), getContext().dispatcher());
128
129     }
130
131     /**
132      * Finds routers for the given rpc.
133      * @param msg
134      */
135     private void receiveGetRouter(Messages.FindRouters msg) {
136         final ActorRef sender = getSender();
137
138         //if empty message, return empty list
139         if (msg.getRouteIdentifier() == null) {
140             sender.tell(createEmptyReply(), getSelf());
141             return;
142         }
143
144         Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), 1000);
145         futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher());
146
147     }
148
149     /**
150      * Helper to create empty reply when no routers are found
151      *
152      * @return
153      */
154     private Messages.FindRoutersReply createEmptyReply() {
155         List<Pair<ActorRef, Long>> routerWithUpdateTime = Collections.emptyList();
156         return new Messages.FindRoutersReply(routerWithUpdateTime);
157     }
158
159     /**
160      * Helper to create a reply when routers are found for the given rpc
161      * @param buckets
162      * @param routeId
163      * @return
164      */
165     private Messages.FindRoutersReply createReplyWithRouters(Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
166
167         List<Pair<ActorRef, Long>> routers = new ArrayList<>();
168
169         Option<Pair<ActorRef, Long>> routerWithUpdateTime = null;
170
171         for (Bucket bucket : buckets.values()) {
172
173             RoutingTable table = (RoutingTable) bucket.getData();
174
175             if (table == null)
176                 continue;
177
178             routerWithUpdateTime = table.getRouterFor(routeId);
179
180             if (routerWithUpdateTime.isEmpty())
181                 continue;
182
183             routers.add(routerWithUpdateTime.get());
184         }
185
186         return new Messages.FindRoutersReply(routers);
187     }
188
189
190     ///
191     ///private factories to create Mapper
192     ///
193
194     /**
195      *  Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found
196      *
197      * @param routeId the rpc
198      * @param sender  client who asked to find the routers.
199      * @return
200      */
201     private Mapper<Object, Void> getMapperToGetRouter(final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
202         return new Mapper<Object, Void>() {
203             @Override
204             public Void apply(Object replyMessage) {
205
206                 if (replyMessage instanceof GetAllBucketsReply) {
207
208                     GetAllBucketsReply reply = (GetAllBucketsReply) replyMessage;
209                     Map<Address, Bucket> buckets = reply.getBuckets();
210
211                     if (buckets == null || buckets.isEmpty()) {
212                         sender.tell(createEmptyReply(), getSelf());
213                         return null;
214                     }
215
216                     sender.tell(createReplyWithRouters(buckets, routeId), getSelf());
217                 }
218                 return null;
219             }
220         };
221     }
222
223     /**
224      * Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently,
225      * it updates the local bucket in bucket store.
226      *
227      * @param routeId rpc to remote
228      * @return
229      */
230     private Mapper<Object, Void> getMapperToRemoveRoute(final RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
231         return new Mapper<Object, Void>() {
232             @Override
233             public Void apply(Object replyMessage) {
234                 if (replyMessage instanceof GetLocalBucketReply) {
235
236                     GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
237                     Bucket<RoutingTable> bucket = reply.getBucket();
238
239                     if (bucket == null) {
240                         log.debug("Local bucket is null");
241                         return null;
242                     }
243
244                     RoutingTable table = bucket.getData();
245                     if (table == null)
246                         table = new RoutingTable();
247
248                     table.setRouter(localRouter);
249                     table.removeRoute(routeId);
250
251                     bucket.setData(table);
252
253                     UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
254                     bucketStore.tell(updateBucketMessage, getSelf());
255                 }
256                 return null;
257             }
258         };
259     }
260
261     /**
262      * Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently,
263      * it updates the local bucket in bucket store.
264      *
265      * @param routeId rpc to add
266      * @return
267      */
268     private Mapper<Object, Void> getMapperToAddRoute(final RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
269
270         return new Mapper<Object, Void>() {
271             @Override
272             public Void apply(Object replyMessage) {
273                 if (replyMessage instanceof GetLocalBucketReply) {
274
275                     GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
276                     Bucket<RoutingTable> bucket = reply.getBucket();
277
278                     if (bucket == null) {
279                         log.debug("Local bucket is null");
280                         return null;
281                     }
282
283                     RoutingTable table = bucket.getData();
284                     if (table == null)
285                         table = new RoutingTable();
286
287                     table.setRouter(localRouter);
288                     table.addRoute(routeId);
289
290                     bucket.setData(table);
291
292                     UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
293                     bucketStore.tell(updateBucketMessage, getSelf());
294                 }
295
296                 return null;
297             }
298         };
299     }
300
301     /**
302      * All messages used by the RpcRegistry
303      */
304     public static class Messages {
305
306
307         public static class ContainsRoute {
308             final RpcRouter.RouteIdentifier<?,?,?> routeIdentifier;
309
310             public ContainsRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
311                 Preconditions.checkArgument(routeIdentifier != null);
312                 this.routeIdentifier = routeIdentifier;
313             }
314
315             public RpcRouter.RouteIdentifier<?,?,?> getRouteIdentifier(){
316                 return this.routeIdentifier;
317             }
318
319             @Override
320             public String toString() {
321                 return this.getClass().getSimpleName() + "{" +
322                         "routeIdentifier=" + routeIdentifier +
323                         '}';
324             }
325         }
326
327         public static class AddOrUpdateRoute extends ContainsRoute{
328
329             public AddOrUpdateRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
330                 super(routeIdentifier);
331             }
332         }
333
334         public static class RemoveRoute extends ContainsRoute {
335
336             public RemoveRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
337                 super(routeIdentifier);
338             }
339         }
340
341         public static class SetLocalRouter{
342             private final ActorRef router;
343
344             public SetLocalRouter(ActorRef router) {
345                 this.router = router;
346             }
347
348             public ActorRef getRouter(){
349                 return this.router;
350             }
351
352             @Override
353             public String toString() {
354                 return "SetLocalRouter{" +
355                         "router=" + router +
356                         '}';
357             }
358         }
359
360         public static class FindRouters extends ContainsRoute {
361             public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
362                 super(routeIdentifier);
363             }
364         }
365
366         public static class FindRoutersReply {
367             final List<Pair<ActorRef, Long>> routerWithUpdateTime;
368
369             public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
370                 this.routerWithUpdateTime = routerWithUpdateTime;
371             }
372
373             public List<Pair<ActorRef, Long>> getRouterWithUpdateTime(){
374                 return routerWithUpdateTime;
375             }
376
377             @Override
378             public String toString() {
379                 return "FindRoutersReply{" +
380                         "routerWithUpdateTime=" + routerWithUpdateTime +
381                         '}';
382             }
383         }
384     }
385 }