2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.remote.rpc.registry;
10 import akka.actor.ActorRef;
11 import akka.actor.Cancellable;
12 import akka.actor.Props;
13 import akka.japi.Option;
14 import akka.japi.Pair;
15 import com.google.common.base.Preconditions;
16 import java.util.ArrayList;
17 import java.util.Collections;
18 import java.util.HashSet;
19 import java.util.List;
21 import java.util.concurrent.atomic.AtomicReference;
22 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
23 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
24 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
25 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
26 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
27 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
28 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
29 import org.opendaylight.controller.sal.connector.api.RpcRouter;
30 import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
31 import scala.concurrent.duration.FiniteDuration;
34 * Registry to look up cluster nodes that have registered for a given rpc.
37 * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
38 * cluster wide information.
40 public class RpcRegistry extends BucketStore<RoutingTable> {
41 private final Set<Runnable> routesUpdatedCallbacks = new HashSet<>();
42 private final FiniteDuration findRouterTimeout;
44 public RpcRegistry(RemoteRpcProviderConfig config) {
46 getLocalBucket().setData(new RoutingTable());
47 findRouterTimeout = getConfig().getGossipTickInterval().$times(10);
50 public static Props props(RemoteRpcProviderConfig config) {
51 return Props.create(RpcRegistry.class, config);
55 protected void handleReceive(Object message) throws Exception {
56 //TODO: if sender is remote, reject message
58 if (message instanceof SetLocalRouter) {
59 receiveSetLocalRouter((SetLocalRouter) message);
60 } else if (message instanceof AddOrUpdateRoutes) {
61 receiveAddRoutes((AddOrUpdateRoutes) message);
62 } else if (message instanceof RemoveRoutes) {
63 receiveRemoveRoutes((RemoveRoutes) message);
64 } else if (message instanceof Messages.FindRouters) {
65 receiveGetRouter((FindRouters) message);
66 } else if (message instanceof Runnable) {
67 ((Runnable)message).run();
69 super.handleReceive(message);
74 * Registers a rpc broker.
76 * @param message contains {@link akka.actor.ActorRef} for rpc broker
78 private void receiveSetLocalRouter(SetLocalRouter message) {
79 getLocalBucket().getData().setRouter(message.getRouter());
82 private void receiveAddRoutes(AddOrUpdateRoutes msg) {
84 log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
86 RoutingTable table = getLocalBucket().getData().copy();
87 for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
88 table.addRoute(routeId);
91 updateLocalBucket(table);
97 * Processes a RemoveRoutes message.
99 * @param msg contains list of route ids to remove
101 private void receiveRemoveRoutes(RemoveRoutes msg) {
103 RoutingTable table = getLocalBucket().getData().copy();
104 for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
105 table.removeRoute(routeId);
108 updateLocalBucket(table);
112 * Finds routers for the given rpc.
114 * @param findRouters the FindRouters request
116 private void receiveGetRouter(final FindRouters findRouters) {
117 log.debug("receiveGetRouter for {}", findRouters.getRouteIdentifier());
119 final ActorRef sender = getSender();
120 if (!findRouters(findRouters, sender)) {
121 log.debug("No routers found for {} - scheduling {} ms timer", findRouters.getRouteIdentifier(),
122 findRouterTimeout.toMillis());
124 final AtomicReference<Cancellable> timer = new AtomicReference<>();
125 final Runnable routesUpdatedRunnable = new Runnable() {
128 if (findRouters(findRouters, sender)) {
129 routesUpdatedCallbacks.remove(this);
130 timer.get().cancel();
135 routesUpdatedCallbacks.add(routesUpdatedRunnable);
137 Runnable timerRunnable = () -> {
138 log.warn("Timed out finding routers for {}", findRouters.getRouteIdentifier());
140 routesUpdatedCallbacks.remove(routesUpdatedRunnable);
141 sender.tell(new Messages.FindRoutersReply(
142 Collections.<Pair<ActorRef, Long>>emptyList()), self());
145 timer.set(getContext().system().scheduler().scheduleOnce(findRouterTimeout, self(), timerRunnable,
146 getContext().dispatcher(), self()));
150 private boolean findRouters(FindRouters findRouters, ActorRef sender) {
151 List<Pair<ActorRef, Long>> routers = new ArrayList<>();
153 RouteIdentifier<?, ?, ?> routeId = findRouters.getRouteIdentifier();
154 findRoutes(getLocalBucket().getData(), routeId, routers);
156 for (Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
157 findRoutes(bucket.getData(), routeId, routers);
160 log.debug("Found {} routers for {}", routers.size(), findRouters.getRouteIdentifier());
162 boolean foundRouters = !routers.isEmpty();
164 sender.tell(new Messages.FindRoutersReply(routers), getSelf());
170 private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier<?, ?, ?> routeId,
171 List<Pair<ActorRef, Long>> routers) {
176 Option<Pair<ActorRef, Long>> routerWithUpdateTime = table.getRouterFor(routeId);
177 if (!routerWithUpdateTime.isEmpty()) {
178 routers.add(routerWithUpdateTime.get());
183 protected void onBucketsUpdated() {
184 if (routesUpdatedCallbacks.isEmpty()) {
188 for (Runnable callBack: routesUpdatedCallbacks.toArray(new Runnable[routesUpdatedCallbacks.size()])) {
194 * All messages used by the RpcRegistry.
196 public static class Messages {
199 public static class ContainsRoute {
200 final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
202 public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
203 Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(),
204 "Route Identifiers must be supplied");
205 this.routeIdentifiers = routeIdentifiers;
208 public List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
209 return this.routeIdentifiers;
213 public String toString() {
214 return "ContainsRoute{" + "routeIdentifiers=" + routeIdentifiers + '}';
218 public static class AddOrUpdateRoutes extends ContainsRoute {
220 public AddOrUpdateRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
221 super(routeIdentifiers);
225 public static class RemoveRoutes extends ContainsRoute {
227 public RemoveRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
228 super(routeIdentifiers);
232 public static class SetLocalRouter {
233 private final ActorRef router;
235 public SetLocalRouter(ActorRef router) {
236 Preconditions.checkArgument(router != null, "Router must not be null");
237 this.router = router;
240 public ActorRef getRouter() {
245 public String toString() {
246 return "SetLocalRouter{" + "router=" + router + '}';
250 public static class FindRouters {
251 private final RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier;
253 public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
254 Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
255 this.routeIdentifier = routeIdentifier;
258 public RpcRouter.RouteIdentifier<?, ?, ?> getRouteIdentifier() {
259 return routeIdentifier;
263 public String toString() {
264 return "FindRouters{" + "routeIdentifier=" + routeIdentifier + '}';
268 public static class FindRoutersReply {
269 final List<Pair<ActorRef, Long>> routerWithUpdateTime;
271 public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
272 Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
273 this.routerWithUpdateTime = routerWithUpdateTime;
276 public List<Pair<ActorRef, Long>> getRouterWithUpdateTime() {
277 return routerWithUpdateTime;
281 public String toString() {
282 return "FindRoutersReply{" + "routerWithUpdateTime=" + routerWithUpdateTime + '}';