43f5dbcdc0ea8d71d77afec172f4f5328cd66e5d
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Cancellable;
12 import akka.actor.Props;
13 import akka.japi.Creator;
14 import akka.japi.Option;
15 import akka.japi.Pair;
16 import com.google.common.base.Preconditions;
17 import java.util.ArrayList;
18 import java.util.Collections;
19 import java.util.HashSet;
20 import java.util.List;
21 import java.util.Set;
22 import java.util.concurrent.atomic.AtomicReference;
23 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
24 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
25 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
26 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
27 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
28 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
29 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
30 import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl;
31 import org.opendaylight.controller.sal.connector.api.RpcRouter;
32 import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
33 import scala.concurrent.duration.FiniteDuration;
34
35 /**
36  * Registry to look up cluster nodes that have registered for a given rpc.
37  *
38  * <p>
39  * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
40  * cluster wide information.
41  */
42 public class RpcRegistry extends BucketStore<RoutingTable> {
43     private final Set<Runnable> routesUpdatedCallbacks = new HashSet<>();
44     private final FiniteDuration findRouterTimeout;
45
46     public RpcRegistry(RemoteRpcProviderConfig config) {
47         super(config);
48         getLocalBucket().setData(new RoutingTable());
49         findRouterTimeout = getConfig().getGossipTickInterval().$times(10);
50     }
51
52     public static Props props(RemoteRpcProviderConfig config) {
53         return Props.create(new RpcRegistryCreator(config));
54     }
55
56     @Override
57     protected void handleReceive(Object message) throws Exception {
58         //TODO: if sender is remote, reject message
59
60         if (message instanceof SetLocalRouter) {
61             receiveSetLocalRouter((SetLocalRouter) message);
62         } else if (message instanceof AddOrUpdateRoutes) {
63             receiveAddRoutes((AddOrUpdateRoutes) message);
64         } else if (message instanceof RemoveRoutes) {
65             receiveRemoveRoutes((RemoveRoutes) message);
66         } else if (message instanceof Messages.FindRouters) {
67             receiveGetRouter((FindRouters) message);
68         } else if (message instanceof Runnable) {
69             ((Runnable)message).run();
70         } else {
71             super.handleReceive(message);
72         }
73     }
74
75     /**
76      * Registers a rpc broker.
77      *
78      * @param message contains {@link akka.actor.ActorRef} for rpc broker
79      */
80     private void receiveSetLocalRouter(SetLocalRouter message) {
81         getLocalBucket().getData().setRouter(message.getRouter());
82     }
83
84     private void receiveAddRoutes(AddOrUpdateRoutes msg) {
85
86         log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
87
88         RoutingTable table = getLocalBucket().getData().copy();
89         for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
90             table.addRoute(routeId);
91         }
92
93         updateLocalBucket(table);
94
95         onBucketsUpdated();
96     }
97
98     /**
99      * Processes a RemoveRoutes message.
100      *
101      * @param msg contains list of route ids to remove
102      */
103     private void receiveRemoveRoutes(RemoveRoutes msg) {
104
105         RoutingTable table = getLocalBucket().getData().copy();
106         for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
107             table.removeRoute(routeId);
108         }
109
110         updateLocalBucket(table);
111     }
112
113     /**
114      * Finds routers for the given rpc.
115      *
116      * @param findRouters the FindRouters request
117      */
118     private void receiveGetRouter(final FindRouters findRouters) {
119         log.debug("receiveGetRouter for {}", findRouters.getRouteIdentifier());
120
121         final ActorRef sender = getSender();
122         if (!findRouters(findRouters, sender)) {
123             log.debug("No routers found for {} - scheduling {} ms timer", findRouters.getRouteIdentifier(),
124                     findRouterTimeout.toMillis());
125
126             final AtomicReference<Cancellable> timer = new AtomicReference<>();
127             final Runnable routesUpdatedRunnable = new Runnable() {
128                 @Override
129                 public void run() {
130                     if (findRouters(findRouters, sender)) {
131                         routesUpdatedCallbacks.remove(this);
132                         timer.get().cancel();
133                     }
134                 }
135             };
136
137             routesUpdatedCallbacks.add(routesUpdatedRunnable);
138
139             Runnable timerRunnable = () -> {
140                 log.warn("Timed out finding routers for {}", findRouters.getRouteIdentifier());
141
142                 routesUpdatedCallbacks.remove(routesUpdatedRunnable);
143                 sender.tell(new Messages.FindRoutersReply(
144                         Collections.<Pair<ActorRef, Long>>emptyList()), self());
145             };
146
147             timer.set(getContext().system().scheduler().scheduleOnce(findRouterTimeout, self(), timerRunnable,
148                     getContext().dispatcher(), self()));
149         }
150     }
151
152     private boolean findRouters(FindRouters findRouters, ActorRef sender) {
153         List<Pair<ActorRef, Long>> routers = new ArrayList<>();
154
155         RouteIdentifier<?, ?, ?> routeId = findRouters.getRouteIdentifier();
156         findRoutes(getLocalBucket().getData(), routeId, routers);
157
158         for (Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
159             findRoutes(bucket.getData(), routeId, routers);
160         }
161
162         log.debug("Found {} routers for {}", routers.size(), findRouters.getRouteIdentifier());
163
164         boolean foundRouters = !routers.isEmpty();
165         if (foundRouters) {
166             sender.tell(new Messages.FindRoutersReply(routers), getSelf());
167         }
168
169         return foundRouters;
170     }
171
172     private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier<?, ?, ?> routeId,
173             List<Pair<ActorRef, Long>> routers) {
174         if (table == null) {
175             return;
176         }
177
178         Option<Pair<ActorRef, Long>> routerWithUpdateTime = table.getRouterFor(routeId);
179         if (!routerWithUpdateTime.isEmpty()) {
180             routers.add(routerWithUpdateTime.get());
181         }
182     }
183
184     @Override
185     protected void onBucketsUpdated() {
186         if (routesUpdatedCallbacks.isEmpty()) {
187             return;
188         }
189
190         for (Runnable callBack: routesUpdatedCallbacks.toArray(new Runnable[routesUpdatedCallbacks.size()])) {
191             callBack.run();
192         }
193     }
194
195     /**
196      * All messages used by the RpcRegistry.
197      */
198     public static class Messages {
199
200
201         public static class ContainsRoute {
202             final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
203
204             public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
205                 Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(),
206                         "Route Identifiers must be supplied");
207                 this.routeIdentifiers = routeIdentifiers;
208             }
209
210             public List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
211                 return this.routeIdentifiers;
212             }
213
214             @Override
215             public String toString() {
216                 return "ContainsRoute{" + "routeIdentifiers=" + routeIdentifiers + '}';
217             }
218         }
219
220         public static class AddOrUpdateRoutes extends ContainsRoute {
221
222             public AddOrUpdateRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
223                 super(routeIdentifiers);
224             }
225         }
226
227         public static class RemoveRoutes extends ContainsRoute {
228
229             public RemoveRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
230                 super(routeIdentifiers);
231             }
232         }
233
234         public static class SetLocalRouter {
235             private final ActorRef router;
236
237             public SetLocalRouter(ActorRef router) {
238                 Preconditions.checkArgument(router != null, "Router must not be null");
239                 this.router = router;
240             }
241
242             public ActorRef getRouter() {
243                 return this.router;
244             }
245
246             @Override
247             public String toString() {
248                 return "SetLocalRouter{" + "router=" + router + '}';
249             }
250         }
251
252         public static class FindRouters {
253             private final RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier;
254
255             public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
256                 Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
257                 this.routeIdentifier = routeIdentifier;
258             }
259
260             public RpcRouter.RouteIdentifier<?, ?, ?> getRouteIdentifier() {
261                 return routeIdentifier;
262             }
263
264             @Override
265             public String toString() {
266                 return "FindRouters{" + "routeIdentifier=" + routeIdentifier + '}';
267             }
268         }
269
270         public static class FindRoutersReply {
271             final List<Pair<ActorRef, Long>> routerWithUpdateTime;
272
273             public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
274                 Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
275                 this.routerWithUpdateTime = routerWithUpdateTime;
276             }
277
278             public List<Pair<ActorRef, Long>> getRouterWithUpdateTime() {
279                 return routerWithUpdateTime;
280             }
281
282             @Override
283             public String toString() {
284                 return "FindRoutersReply{" + "routerWithUpdateTime=" + routerWithUpdateTime + '}';
285             }
286         }
287     }
288
289     private static class RpcRegistryCreator implements Creator<RpcRegistry> {
290         private static final long serialVersionUID = 1L;
291         private final RemoteRpcProviderConfig config;
292
293         private RpcRegistryCreator(RemoteRpcProviderConfig config) {
294             this.config = config;
295         }
296
297         @Override
298         public RpcRegistry create() throws Exception {
299             RpcRegistry registry =  new RpcRegistry(config);
300             new RemoteRpcRegistryMXBeanImpl(registry);
301             return registry;
302         }
303     }
304 }