1e481bc311b3f30d2bfc75346077253a871bdff0
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Cancellable;
12 import akka.actor.Props;
13 import akka.japi.Creator;
14 import akka.japi.Option;
15 import akka.japi.Pair;
16 import com.google.common.base.Preconditions;
17 import java.util.ArrayList;
18 import java.util.Collections;
19 import java.util.HashSet;
20 import java.util.List;
21 import java.util.Set;
22 import java.util.concurrent.atomic.AtomicReference;
23 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
24 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
25 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
26 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
27 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
28 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
29 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
30 import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBean;
31 import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl;
32 import org.opendaylight.controller.sal.connector.api.RpcRouter;
33 import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
34 import scala.concurrent.duration.FiniteDuration;
35
36 /**
37  * Registry to look up cluster nodes that have registered for a given rpc.
38  * <p/>
39  * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
40  * cluster wide information.
41  */
42 public class RpcRegistry extends BucketStore<RoutingTable> {
43     private final Set<Runnable> routesUpdatedCallbacks = new HashSet<>();
44     private final FiniteDuration findRouterTimeout;
45
46     public RpcRegistry(RemoteRpcProviderConfig config) {
47         super(config);
48         getLocalBucket().setData(new RoutingTable());
49         findRouterTimeout = getConfig().getGossipTickInterval().$times(10);
50     }
51
52     public static Props props(RemoteRpcProviderConfig config) {
53         return Props.create(new RpcRegistryCreator(config));
54     }
55
56     @Override
57     protected void handleReceive(Object message) throws Exception {
58         //TODO: if sender is remote, reject message
59
60         if (message instanceof SetLocalRouter) {
61             receiveSetLocalRouter((SetLocalRouter) message);
62         } else if (message instanceof AddOrUpdateRoutes) {
63             receiveAddRoutes((AddOrUpdateRoutes) message);
64         } else if (message instanceof RemoveRoutes) {
65             receiveRemoveRoutes((RemoveRoutes) message);
66         } else if (message instanceof Messages.FindRouters) {
67             receiveGetRouter((FindRouters) message);
68         } else if (message instanceof Runnable) {
69             ((Runnable)message).run();
70         } else {
71             super.handleReceive(message);
72         }
73     }
74
75     /**
76      * Register's rpc broker
77      *
78      * @param message contains {@link akka.actor.ActorRef} for rpc broker
79      */
80     private void receiveSetLocalRouter(SetLocalRouter message) {
81         getLocalBucket().getData().setRouter(message.getRouter());
82     }
83
84     /**
85      * @param msg
86      */
87     private void receiveAddRoutes(AddOrUpdateRoutes msg) {
88
89         log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
90
91         RoutingTable table = getLocalBucket().getData().copy();
92         for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
93             table.addRoute(routeId);
94         }
95
96         updateLocalBucket(table);
97
98         onBucketsUpdated();
99     }
100
101     /**
102      * @param msg contains list of route ids to remove
103      */
104     private void receiveRemoveRoutes(RemoveRoutes msg) {
105
106         RoutingTable table = getLocalBucket().getData().copy();
107         for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
108             table.removeRoute(routeId);
109         }
110
111         updateLocalBucket(table);
112     }
113
114     /**
115      * Finds routers for the given rpc.
116      *
117      * @param findRouters
118      */
119     private void receiveGetRouter(final FindRouters findRouters) {
120         log.debug("receiveGetRouter for {}", findRouters.getRouteIdentifier());
121
122         final ActorRef sender = getSender();
123         if(!findRouters(findRouters, sender)) {
124             log.debug("No routers found for {} - scheduling {} ms timer", findRouters.getRouteIdentifier(),
125                     findRouterTimeout.toMillis());
126
127             final AtomicReference<Cancellable> timer = new AtomicReference<>();
128             final Runnable routesUpdatedRunnable = new Runnable() {
129                 @Override
130                 public void run() {
131                     if(findRouters(findRouters, sender)) {
132                         routesUpdatedCallbacks.remove(this);
133                         timer.get().cancel();
134                     }
135                 }
136             };
137
138             routesUpdatedCallbacks.add(routesUpdatedRunnable);
139
140             Runnable timerRunnable = new Runnable() {
141                 @Override
142                 public void run() {
143                     log.warn("Timed out finding routers for {}", findRouters.getRouteIdentifier());
144
145                     routesUpdatedCallbacks.remove(routesUpdatedRunnable);
146                     sender.tell(new Messages.FindRoutersReply(
147                             Collections.<Pair<ActorRef, Long>>emptyList()), self());
148                 }
149             };
150
151             timer.set(getContext().system().scheduler().scheduleOnce(findRouterTimeout, self(), timerRunnable,
152                     getContext().dispatcher(), self()));
153         }
154     }
155
156     private boolean findRouters(FindRouters findRouters, ActorRef sender) {
157         List<Pair<ActorRef, Long>> routers = new ArrayList<>();
158
159         RouteIdentifier<?, ?, ?> routeId = findRouters.getRouteIdentifier();
160         findRoutes(getLocalBucket().getData(), routeId, routers);
161
162         for(Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
163             findRoutes(bucket.getData(), routeId, routers);
164         }
165
166         log.debug("Found {} routers for {}", routers.size(), findRouters.getRouteIdentifier());
167
168         boolean foundRouters = !routers.isEmpty();
169         if(foundRouters) {
170             sender.tell(new Messages.FindRoutersReply(routers), getSelf());
171         }
172
173         return foundRouters;
174     }
175
176     private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier<?, ?, ?> routeId,
177             List<Pair<ActorRef, Long>> routers) {
178         if (table == null) {
179             return;
180         }
181
182         Option<Pair<ActorRef, Long>> routerWithUpdateTime = table.getRouterFor(routeId);
183         if(!routerWithUpdateTime.isEmpty()) {
184             routers.add(routerWithUpdateTime.get());
185         }
186     }
187
188     @Override
189     protected void onBucketsUpdated() {
190         if(routesUpdatedCallbacks.isEmpty()) {
191             return;
192         }
193
194         for(Runnable callBack: routesUpdatedCallbacks.toArray(new Runnable[routesUpdatedCallbacks.size()])) {
195             callBack.run();
196         }
197     }
198
199     /**
200      * All messages used by the RpcRegistry
201      */
202     public static class Messages {
203
204
205         public static class ContainsRoute {
206             final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
207
208             public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
209                 Preconditions.checkArgument(routeIdentifiers != null &&
210                                             !routeIdentifiers.isEmpty(),
211                                             "Route Identifiers must be supplied");
212                 this.routeIdentifiers = routeIdentifiers;
213             }
214
215             public List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
216                 return this.routeIdentifiers;
217             }
218
219             @Override
220             public String toString() {
221                 return "ContainsRoute{" +
222                         "routeIdentifiers=" + routeIdentifiers +
223                         '}';
224             }
225         }
226
227         public static class AddOrUpdateRoutes extends ContainsRoute {
228
229             public AddOrUpdateRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
230                 super(routeIdentifiers);
231             }
232         }
233
234         public static class RemoveRoutes extends ContainsRoute {
235
236             public RemoveRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
237                 super(routeIdentifiers);
238             }
239         }
240
241         public static class SetLocalRouter {
242             private final ActorRef router;
243
244             public SetLocalRouter(ActorRef router) {
245                 Preconditions.checkArgument(router != null, "Router must not be null");
246                 this.router = router;
247             }
248
249             public ActorRef getRouter() {
250                 return this.router;
251             }
252
253             @Override
254             public String toString() {
255                 return "SetLocalRouter{" +
256                         "router=" + router +
257                         '}';
258             }
259         }
260
261         public static class FindRouters {
262             private final RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier;
263
264             public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
265                 Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
266                 this.routeIdentifier = routeIdentifier;
267             }
268
269             public RpcRouter.RouteIdentifier<?, ?, ?> getRouteIdentifier() {
270                 return routeIdentifier;
271             }
272
273             @Override
274             public String toString() {
275                 return "FindRouters{" +
276                         "routeIdentifier=" + routeIdentifier +
277                         '}';
278             }
279         }
280
281         public static class FindRoutersReply {
282             final List<Pair<ActorRef, Long>> routerWithUpdateTime;
283
284             public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
285                 Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
286                 this.routerWithUpdateTime = routerWithUpdateTime;
287             }
288
289             public List<Pair<ActorRef, Long>> getRouterWithUpdateTime() {
290                 return routerWithUpdateTime;
291             }
292
293             @Override
294             public String toString() {
295                 return "FindRoutersReply{" +
296                         "routerWithUpdateTime=" + routerWithUpdateTime +
297                         '}';
298             }
299         }
300     }
301
302     private static class RpcRegistryCreator implements Creator<RpcRegistry> {
303         private static final long serialVersionUID = 1L;
304         private final RemoteRpcProviderConfig config;
305
306         private RpcRegistryCreator(RemoteRpcProviderConfig config) {
307             this.config = config;
308         }
309
310         @Override
311         public RpcRegistry create() throws Exception {
312             RpcRegistry registry =  new RpcRegistry(config);
313             RemoteRpcRegistryMXBean mxBean = new RemoteRpcRegistryMXBeanImpl(registry);
314             return registry;
315         }
316     }
317 }