7c5efc29e8640c6028ca829fd022b12aea010400
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Cancellable;
12 import akka.actor.Props;
13 import akka.japi.Creator;
14 import akka.japi.Option;
15 import akka.japi.Pair;
16 import com.google.common.base.Preconditions;
17 import java.util.ArrayList;
18 import java.util.Collections;
19 import java.util.HashSet;
20 import java.util.List;
21 import java.util.Set;
22 import java.util.concurrent.atomic.AtomicReference;
23 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
24 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
25 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
26 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
27 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
28 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
29 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
30 import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl;
31 import org.opendaylight.controller.sal.connector.api.RpcRouter;
32 import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
33 import scala.concurrent.duration.FiniteDuration;
34
35 /**
36  * Registry to look up cluster nodes that have registered for a given rpc.
37  * <p/>
38  * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
39  * cluster wide information.
40  */
41 public class RpcRegistry extends BucketStore<RoutingTable> {
42     private final Set<Runnable> routesUpdatedCallbacks = new HashSet<>();
43     private final FiniteDuration findRouterTimeout;
44
45     public RpcRegistry(RemoteRpcProviderConfig config) {
46         super(config);
47         getLocalBucket().setData(new RoutingTable());
48         findRouterTimeout = getConfig().getGossipTickInterval().$times(10);
49     }
50
51     public static Props props(RemoteRpcProviderConfig config) {
52         return Props.create(new RpcRegistryCreator(config));
53     }
54
55     @Override
56     protected void handleReceive(Object message) throws Exception {
57         //TODO: if sender is remote, reject message
58
59         if (message instanceof SetLocalRouter) {
60             receiveSetLocalRouter((SetLocalRouter) message);
61         } else if (message instanceof AddOrUpdateRoutes) {
62             receiveAddRoutes((AddOrUpdateRoutes) message);
63         } else if (message instanceof RemoveRoutes) {
64             receiveRemoveRoutes((RemoveRoutes) message);
65         } else if (message instanceof Messages.FindRouters) {
66             receiveGetRouter((FindRouters) message);
67         } else if (message instanceof Runnable) {
68             ((Runnable)message).run();
69         } else {
70             super.handleReceive(message);
71         }
72     }
73
74     /**
75      * Registers a rpc broker.
76      *
77      * @param message contains {@link akka.actor.ActorRef} for rpc broker
78      */
79     private void receiveSetLocalRouter(SetLocalRouter message) {
80         getLocalBucket().getData().setRouter(message.getRouter());
81     }
82
83     private void receiveAddRoutes(AddOrUpdateRoutes msg) {
84
85         log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
86
87         RoutingTable table = getLocalBucket().getData().copy();
88         for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
89             table.addRoute(routeId);
90         }
91
92         updateLocalBucket(table);
93
94         onBucketsUpdated();
95     }
96
97     /**
98      * Processes a RemoveRoutes message.
99      *
100      * @param msg contains list of route ids to remove
101      */
102     private void receiveRemoveRoutes(RemoveRoutes msg) {
103
104         RoutingTable table = getLocalBucket().getData().copy();
105         for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
106             table.removeRoute(routeId);
107         }
108
109         updateLocalBucket(table);
110     }
111
112     /**
113      * Finds routers for the given rpc.
114      *
115      * @param findRouters the FindRouters request
116      */
117     private void receiveGetRouter(final FindRouters findRouters) {
118         log.debug("receiveGetRouter for {}", findRouters.getRouteIdentifier());
119
120         final ActorRef sender = getSender();
121         if (!findRouters(findRouters, sender)) {
122             log.debug("No routers found for {} - scheduling {} ms timer", findRouters.getRouteIdentifier(),
123                     findRouterTimeout.toMillis());
124
125             final AtomicReference<Cancellable> timer = new AtomicReference<>();
126             final Runnable routesUpdatedRunnable = new Runnable() {
127                 @Override
128                 public void run() {
129                     if (findRouters(findRouters, sender)) {
130                         routesUpdatedCallbacks.remove(this);
131                         timer.get().cancel();
132                     }
133                 }
134             };
135
136             routesUpdatedCallbacks.add(routesUpdatedRunnable);
137
138             Runnable timerRunnable = () -> {
139                 log.warn("Timed out finding routers for {}", findRouters.getRouteIdentifier());
140
141                 routesUpdatedCallbacks.remove(routesUpdatedRunnable);
142                 sender.tell(new Messages.FindRoutersReply(
143                         Collections.<Pair<ActorRef, Long>>emptyList()), self());
144             };
145
146             timer.set(getContext().system().scheduler().scheduleOnce(findRouterTimeout, self(), timerRunnable,
147                     getContext().dispatcher(), self()));
148         }
149     }
150
151     private boolean findRouters(FindRouters findRouters, ActorRef sender) {
152         List<Pair<ActorRef, Long>> routers = new ArrayList<>();
153
154         RouteIdentifier<?, ?, ?> routeId = findRouters.getRouteIdentifier();
155         findRoutes(getLocalBucket().getData(), routeId, routers);
156
157         for (Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
158             findRoutes(bucket.getData(), routeId, routers);
159         }
160
161         log.debug("Found {} routers for {}", routers.size(), findRouters.getRouteIdentifier());
162
163         boolean foundRouters = !routers.isEmpty();
164         if (foundRouters) {
165             sender.tell(new Messages.FindRoutersReply(routers), getSelf());
166         }
167
168         return foundRouters;
169     }
170
171     private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier<?, ?, ?> routeId,
172             List<Pair<ActorRef, Long>> routers) {
173         if (table == null) {
174             return;
175         }
176
177         Option<Pair<ActorRef, Long>> routerWithUpdateTime = table.getRouterFor(routeId);
178         if (!routerWithUpdateTime.isEmpty()) {
179             routers.add(routerWithUpdateTime.get());
180         }
181     }
182
183     @Override
184     protected void onBucketsUpdated() {
185         if (routesUpdatedCallbacks.isEmpty()) {
186             return;
187         }
188
189         for (Runnable callBack: routesUpdatedCallbacks.toArray(new Runnable[routesUpdatedCallbacks.size()])) {
190             callBack.run();
191         }
192     }
193
194     /**
195      * All messages used by the RpcRegistry.
196      */
197     public static class Messages {
198
199
200         public static class ContainsRoute {
201             final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
202
203             public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
204                 Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(),
205                         "Route Identifiers must be supplied");
206                 this.routeIdentifiers = routeIdentifiers;
207             }
208
209             public List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
210                 return this.routeIdentifiers;
211             }
212
213             @Override
214             public String toString() {
215                 return "ContainsRoute{" + "routeIdentifiers=" + routeIdentifiers + '}';
216             }
217         }
218
219         public static class AddOrUpdateRoutes extends ContainsRoute {
220
221             public AddOrUpdateRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
222                 super(routeIdentifiers);
223             }
224         }
225
226         public static class RemoveRoutes extends ContainsRoute {
227
228             public RemoveRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
229                 super(routeIdentifiers);
230             }
231         }
232
233         public static class SetLocalRouter {
234             private final ActorRef router;
235
236             public SetLocalRouter(ActorRef router) {
237                 Preconditions.checkArgument(router != null, "Router must not be null");
238                 this.router = router;
239             }
240
241             public ActorRef getRouter() {
242                 return this.router;
243             }
244
245             @Override
246             public String toString() {
247                 return "SetLocalRouter{" + "router=" + router + '}';
248             }
249         }
250
251         public static class FindRouters {
252             private final RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier;
253
254             public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
255                 Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
256                 this.routeIdentifier = routeIdentifier;
257             }
258
259             public RpcRouter.RouteIdentifier<?, ?, ?> getRouteIdentifier() {
260                 return routeIdentifier;
261             }
262
263             @Override
264             public String toString() {
265                 return "FindRouters{" + "routeIdentifier=" + routeIdentifier + '}';
266             }
267         }
268
269         public static class FindRoutersReply {
270             final List<Pair<ActorRef, Long>> routerWithUpdateTime;
271
272             public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
273                 Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
274                 this.routerWithUpdateTime = routerWithUpdateTime;
275             }
276
277             public List<Pair<ActorRef, Long>> getRouterWithUpdateTime() {
278                 return routerWithUpdateTime;
279             }
280
281             @Override
282             public String toString() {
283                 return "FindRoutersReply{" + "routerWithUpdateTime=" + routerWithUpdateTime + '}';
284             }
285         }
286     }
287
288     private static class RpcRegistryCreator implements Creator<RpcRegistry> {
289         private static final long serialVersionUID = 1L;
290         private final RemoteRpcProviderConfig config;
291
292         private RpcRegistryCreator(RemoteRpcProviderConfig config) {
293             this.config = config;
294         }
295
296         @Override
297         public RpcRegistry create() throws Exception {
298             RpcRegistry registry =  new RpcRegistry(config);
299             new RemoteRpcRegistryMXBeanImpl(registry);
300             return registry;
301         }
302     }
303 }