2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.remote.rpc.registry;
10 import akka.actor.ActorRef;
11 import akka.actor.Cancellable;
12 import akka.actor.Props;
13 import akka.japi.Creator;
14 import akka.japi.Option;
15 import akka.japi.Pair;
16 import com.google.common.base.Preconditions;
17 import java.util.ArrayList;
18 import java.util.Collections;
19 import java.util.HashSet;
20 import java.util.List;
22 import java.util.concurrent.atomic.AtomicReference;
23 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
24 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
25 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
26 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
27 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
28 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
29 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
30 import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBean;
31 import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl;
32 import org.opendaylight.controller.sal.connector.api.RpcRouter;
33 import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
34 import scala.concurrent.duration.FiniteDuration;
37 * Registry to look up cluster nodes that have registered for a given rpc.
39 * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
40 * cluster wide information.
42 public class RpcRegistry extends BucketStore<RoutingTable> {
43 private final Set<Runnable> routesUpdatedCallbacks = new HashSet<>();
44 private final FiniteDuration findRouterTimeout;
46 public RpcRegistry(RemoteRpcProviderConfig config) {
48 getLocalBucket().setData(new RoutingTable());
49 findRouterTimeout = getConfig().getGossipTickInterval().$times(10);
52 public static Props props(RemoteRpcProviderConfig config) {
53 return Props.create(new RpcRegistryCreator(config));
57 protected void handleReceive(Object message) throws Exception {
58 //TODO: if sender is remote, reject message
60 if (message instanceof SetLocalRouter) {
61 receiveSetLocalRouter((SetLocalRouter) message);
62 } else if (message instanceof AddOrUpdateRoutes) {
63 receiveAddRoutes((AddOrUpdateRoutes) message);
64 } else if (message instanceof RemoveRoutes) {
65 receiveRemoveRoutes((RemoveRoutes) message);
66 } else if (message instanceof Messages.FindRouters) {
67 receiveGetRouter((FindRouters) message);
68 } else if (message instanceof Runnable) {
69 ((Runnable)message).run();
71 super.handleReceive(message);
76 * Register's rpc broker
78 * @param message contains {@link akka.actor.ActorRef} for rpc broker
80 private void receiveSetLocalRouter(SetLocalRouter message) {
81 getLocalBucket().getData().setRouter(message.getRouter());
87 private void receiveAddRoutes(AddOrUpdateRoutes msg) {
89 log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
91 RoutingTable table = getLocalBucket().getData().copy();
92 for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
93 table.addRoute(routeId);
96 updateLocalBucket(table);
102 * @param msg contains list of route ids to remove
104 private void receiveRemoveRoutes(RemoveRoutes msg) {
106 RoutingTable table = getLocalBucket().getData().copy();
107 for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
108 table.removeRoute(routeId);
111 updateLocalBucket(table);
115 * Finds routers for the given rpc.
119 private void receiveGetRouter(final FindRouters findRouters) {
120 log.debug("receiveGetRouter for {}", findRouters.getRouteIdentifier());
122 final ActorRef sender = getSender();
123 if(!findRouters(findRouters, sender)) {
124 log.debug("No routers found for {} - scheduling {} ms timer", findRouters.getRouteIdentifier(),
125 findRouterTimeout.toMillis());
127 final AtomicReference<Cancellable> timer = new AtomicReference<>();
128 final Runnable routesUpdatedRunnable = new Runnable() {
131 if(findRouters(findRouters, sender)) {
132 routesUpdatedCallbacks.remove(this);
133 timer.get().cancel();
138 routesUpdatedCallbacks.add(routesUpdatedRunnable);
140 Runnable timerRunnable = new Runnable() {
143 log.warn("Timed out finding routers for {}", findRouters.getRouteIdentifier());
145 routesUpdatedCallbacks.remove(routesUpdatedRunnable);
146 sender.tell(new Messages.FindRoutersReply(
147 Collections.<Pair<ActorRef, Long>>emptyList()), self());
151 timer.set(getContext().system().scheduler().scheduleOnce(findRouterTimeout, self(), timerRunnable,
152 getContext().dispatcher(), self()));
156 private boolean findRouters(FindRouters findRouters, ActorRef sender) {
157 List<Pair<ActorRef, Long>> routers = new ArrayList<>();
159 RouteIdentifier<?, ?, ?> routeId = findRouters.getRouteIdentifier();
160 findRoutes(getLocalBucket().getData(), routeId, routers);
162 for(Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
163 findRoutes(bucket.getData(), routeId, routers);
166 log.debug("Found {} routers for {}", routers.size(), findRouters.getRouteIdentifier());
168 boolean foundRouters = !routers.isEmpty();
170 sender.tell(new Messages.FindRoutersReply(routers), getSelf());
176 private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier<?, ?, ?> routeId,
177 List<Pair<ActorRef, Long>> routers) {
182 Option<Pair<ActorRef, Long>> routerWithUpdateTime = table.getRouterFor(routeId);
183 if(!routerWithUpdateTime.isEmpty()) {
184 routers.add(routerWithUpdateTime.get());
189 protected void onBucketsUpdated() {
190 if(routesUpdatedCallbacks.isEmpty()) {
194 for(Runnable callBack: routesUpdatedCallbacks.toArray(new Runnable[routesUpdatedCallbacks.size()])) {
200 * All messages used by the RpcRegistry
202 public static class Messages {
205 public static class ContainsRoute {
206 final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
208 public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
209 Preconditions.checkArgument(routeIdentifiers != null &&
210 !routeIdentifiers.isEmpty(),
211 "Route Identifiers must be supplied");
212 this.routeIdentifiers = routeIdentifiers;
215 public List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
216 return this.routeIdentifiers;
220 public String toString() {
221 return "ContainsRoute{" +
222 "routeIdentifiers=" + routeIdentifiers +
227 public static class AddOrUpdateRoutes extends ContainsRoute {
229 public AddOrUpdateRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
230 super(routeIdentifiers);
234 public static class RemoveRoutes extends ContainsRoute {
236 public RemoveRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
237 super(routeIdentifiers);
241 public static class SetLocalRouter {
242 private final ActorRef router;
244 public SetLocalRouter(ActorRef router) {
245 Preconditions.checkArgument(router != null, "Router must not be null");
246 this.router = router;
249 public ActorRef getRouter() {
254 public String toString() {
255 return "SetLocalRouter{" +
261 public static class FindRouters {
262 private final RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier;
264 public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
265 Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
266 this.routeIdentifier = routeIdentifier;
269 public RpcRouter.RouteIdentifier<?, ?, ?> getRouteIdentifier() {
270 return routeIdentifier;
274 public String toString() {
275 return "FindRouters{" +
276 "routeIdentifier=" + routeIdentifier +
281 public static class FindRoutersReply {
282 final List<Pair<ActorRef, Long>> routerWithUpdateTime;
284 public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
285 Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
286 this.routerWithUpdateTime = routerWithUpdateTime;
289 public List<Pair<ActorRef, Long>> getRouterWithUpdateTime() {
290 return routerWithUpdateTime;
294 public String toString() {
295 return "FindRoutersReply{" +
296 "routerWithUpdateTime=" + routerWithUpdateTime +
302 private static class RpcRegistryCreator implements Creator<RpcRegistry> {
303 private static final long serialVersionUID = 1L;
304 private final RemoteRpcProviderConfig config;
306 private RpcRegistryCreator(RemoteRpcProviderConfig config) {
307 this.config = config;
311 public RpcRegistry create() throws Exception {
312 RpcRegistry registry = new RpcRegistry(config);
313 RemoteRpcRegistryMXBean mxBean = new RemoteRpcRegistryMXBeanImpl(registry);