BUG-6937: Add ReachableMember case to Gossiper
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Cancellable;
12 import akka.actor.Props;
13 import akka.japi.Option;
14 import akka.japi.Pair;
15 import com.google.common.base.Preconditions;
16 import java.util.ArrayList;
17 import java.util.Collections;
18 import java.util.HashSet;
19 import java.util.List;
20 import java.util.Set;
21 import java.util.concurrent.atomic.AtomicReference;
22 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
23 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
24 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
25 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
26 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
27 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
28 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
29 import org.opendaylight.controller.sal.connector.api.RpcRouter;
30 import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
31 import scala.concurrent.duration.FiniteDuration;
32
33 /**
34  * Registry to look up cluster nodes that have registered for a given rpc.
35  *
36  * <p>
37  * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
38  * cluster wide information.
39  */
40 public class RpcRegistry extends BucketStore<RoutingTable> {
41     private final Set<Runnable> routesUpdatedCallbacks = new HashSet<>();
42     private final FiniteDuration findRouterTimeout;
43
44     public RpcRegistry(RemoteRpcProviderConfig config) {
45         super(config);
46         getLocalBucket().setData(new RoutingTable());
47         findRouterTimeout = getConfig().getGossipTickInterval().$times(10);
48     }
49
50     public static Props props(RemoteRpcProviderConfig config) {
51         return Props.create(RpcRegistry.class, config);
52     }
53
54     @Override
55     protected void handleReceive(Object message) throws Exception {
56         //TODO: if sender is remote, reject message
57
58         if (message instanceof SetLocalRouter) {
59             receiveSetLocalRouter((SetLocalRouter) message);
60         } else if (message instanceof AddOrUpdateRoutes) {
61             receiveAddRoutes((AddOrUpdateRoutes) message);
62         } else if (message instanceof RemoveRoutes) {
63             receiveRemoveRoutes((RemoveRoutes) message);
64         } else if (message instanceof Messages.FindRouters) {
65             receiveGetRouter((FindRouters) message);
66         } else if (message instanceof Runnable) {
67             ((Runnable)message).run();
68         } else {
69             super.handleReceive(message);
70         }
71     }
72
73     /**
74      * Registers a rpc broker.
75      *
76      * @param message contains {@link akka.actor.ActorRef} for rpc broker
77      */
78     private void receiveSetLocalRouter(SetLocalRouter message) {
79         getLocalBucket().getData().setRouter(message.getRouter());
80     }
81
82     private void receiveAddRoutes(AddOrUpdateRoutes msg) {
83
84         log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
85
86         RoutingTable table = getLocalBucket().getData().copy();
87         for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
88             table.addRoute(routeId);
89         }
90
91         updateLocalBucket(table);
92
93         onBucketsUpdated();
94     }
95
96     /**
97      * Processes a RemoveRoutes message.
98      *
99      * @param msg contains list of route ids to remove
100      */
101     private void receiveRemoveRoutes(RemoveRoutes msg) {
102
103         RoutingTable table = getLocalBucket().getData().copy();
104         for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
105             table.removeRoute(routeId);
106         }
107
108         updateLocalBucket(table);
109     }
110
111     /**
112      * Finds routers for the given rpc.
113      *
114      * @param findRouters the FindRouters request
115      */
116     private void receiveGetRouter(final FindRouters findRouters) {
117         log.debug("receiveGetRouter for {}", findRouters.getRouteIdentifier());
118
119         final ActorRef sender = getSender();
120         if (!findRouters(findRouters, sender)) {
121             log.debug("No routers found for {} - scheduling {} ms timer", findRouters.getRouteIdentifier(),
122                     findRouterTimeout.toMillis());
123
124             final AtomicReference<Cancellable> timer = new AtomicReference<>();
125             final Runnable routesUpdatedRunnable = new Runnable() {
126                 @Override
127                 public void run() {
128                     if (findRouters(findRouters, sender)) {
129                         routesUpdatedCallbacks.remove(this);
130                         timer.get().cancel();
131                     }
132                 }
133             };
134
135             routesUpdatedCallbacks.add(routesUpdatedRunnable);
136
137             Runnable timerRunnable = () -> {
138                 log.warn("Timed out finding routers for {}", findRouters.getRouteIdentifier());
139
140                 routesUpdatedCallbacks.remove(routesUpdatedRunnable);
141                 sender.tell(new Messages.FindRoutersReply(
142                         Collections.<Pair<ActorRef, Long>>emptyList()), self());
143             };
144
145             timer.set(getContext().system().scheduler().scheduleOnce(findRouterTimeout, self(), timerRunnable,
146                     getContext().dispatcher(), self()));
147         }
148     }
149
150     private boolean findRouters(FindRouters findRouters, ActorRef sender) {
151         List<Pair<ActorRef, Long>> routers = new ArrayList<>();
152
153         RouteIdentifier<?, ?, ?> routeId = findRouters.getRouteIdentifier();
154         findRoutes(getLocalBucket().getData(), routeId, routers);
155
156         for (Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
157             findRoutes(bucket.getData(), routeId, routers);
158         }
159
160         log.debug("Found {} routers for {}", routers.size(), findRouters.getRouteIdentifier());
161
162         boolean foundRouters = !routers.isEmpty();
163         if (foundRouters) {
164             sender.tell(new Messages.FindRoutersReply(routers), getSelf());
165         }
166
167         return foundRouters;
168     }
169
170     private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier<?, ?, ?> routeId,
171             List<Pair<ActorRef, Long>> routers) {
172         if (table == null) {
173             return;
174         }
175
176         Option<Pair<ActorRef, Long>> routerWithUpdateTime = table.getRouterFor(routeId);
177         if (!routerWithUpdateTime.isEmpty()) {
178             routers.add(routerWithUpdateTime.get());
179         }
180     }
181
182     @Override
183     protected void onBucketsUpdated() {
184         if (routesUpdatedCallbacks.isEmpty()) {
185             return;
186         }
187
188         for (Runnable callBack: routesUpdatedCallbacks.toArray(new Runnable[routesUpdatedCallbacks.size()])) {
189             callBack.run();
190         }
191     }
192
193     /**
194      * All messages used by the RpcRegistry.
195      */
196     public static class Messages {
197
198
199         public static class ContainsRoute {
200             final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
201
202             public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
203                 Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(),
204                         "Route Identifiers must be supplied");
205                 this.routeIdentifiers = routeIdentifiers;
206             }
207
208             public List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
209                 return this.routeIdentifiers;
210             }
211
212             @Override
213             public String toString() {
214                 return "ContainsRoute{" + "routeIdentifiers=" + routeIdentifiers + '}';
215             }
216         }
217
218         public static class AddOrUpdateRoutes extends ContainsRoute {
219
220             public AddOrUpdateRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
221                 super(routeIdentifiers);
222             }
223         }
224
225         public static class RemoveRoutes extends ContainsRoute {
226
227             public RemoveRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
228                 super(routeIdentifiers);
229             }
230         }
231
232         public static class SetLocalRouter {
233             private final ActorRef router;
234
235             public SetLocalRouter(ActorRef router) {
236                 Preconditions.checkArgument(router != null, "Router must not be null");
237                 this.router = router;
238             }
239
240             public ActorRef getRouter() {
241                 return this.router;
242             }
243
244             @Override
245             public String toString() {
246                 return "SetLocalRouter{" + "router=" + router + '}';
247             }
248         }
249
250         public static class FindRouters {
251             private final RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier;
252
253             public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
254                 Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
255                 this.routeIdentifier = routeIdentifier;
256             }
257
258             public RpcRouter.RouteIdentifier<?, ?, ?> getRouteIdentifier() {
259                 return routeIdentifier;
260             }
261
262             @Override
263             public String toString() {
264                 return "FindRouters{" + "routeIdentifier=" + routeIdentifier + '}';
265             }
266         }
267
268         public static class FindRoutersReply {
269             final List<Pair<ActorRef, Long>> routerWithUpdateTime;
270
271             public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
272                 Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
273                 this.routerWithUpdateTime = routerWithUpdateTime;
274             }
275
276             public List<Pair<ActorRef, Long>> getRouterWithUpdateTime() {
277                 return routerWithUpdateTime;
278             }
279
280             @Override
281             public String toString() {
282                 return "FindRoutersReply{" + "routerWithUpdateTime=" + routerWithUpdateTime + '}';
283             }
284         }
285     }
286 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.