package org.opendaylight.controller.remote.rpc.registry;
import akka.actor.ActorRef;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
+import akka.actor.Cancellable;
+import akka.actor.Props;
import akka.japi.Option;
import akka.japi.Pair;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
+import scala.concurrent.duration.FiniteDuration;
/**
* Registry to look up cluster nodes that have registered for a given rpc.
- * <p/>
+ *
+ * <p>
* It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
* cluster wide information.
*/
public class RpcRegistry extends BucketStore<RoutingTable> {
+ private final Set<Runnable> routesUpdatedCallbacks = new HashSet<>();
+ private final FiniteDuration findRouterTimeout;
- final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
-
- public RpcRegistry() {
+ public RpcRegistry(RemoteRpcProviderConfig config) {
+ super(config);
getLocalBucket().setData(new RoutingTable());
+ findRouterTimeout = getConfig().getGossipTickInterval().$times(10);
+ }
+
+ public static Props props(RemoteRpcProviderConfig config) {
+ return Props.create(RpcRegistry.class, config);
}
@Override
receiveRemoveRoutes((RemoveRoutes) message);
} else if (message instanceof Messages.FindRouters) {
receiveGetRouter((FindRouters) message);
+ } else if (message instanceof Runnable) {
+ ((Runnable)message).run();
} else {
super.handleReceive(message);
}
}
/**
- * Register's rpc broker
+ * Registers a rpc broker.
*
* @param message contains {@link akka.actor.ActorRef} for rpc broker
*/
getLocalBucket().getData().setRouter(message.getRouter());
}
- /**
- * @param msg
- */
private void receiveAddRoutes(AddOrUpdateRoutes msg) {
log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
RoutingTable table = getLocalBucket().getData().copy();
- for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
+ for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
table.addRoute(routeId);
}
updateLocalBucket(table);
+
+ onBucketsUpdated();
}
/**
+ * Processes a RemoveRoutes message.
+ *
* @param msg contains list of route ids to remove
*/
private void receiveRemoveRoutes(RemoveRoutes msg) {
/**
* Finds routers for the given rpc.
*
- * @param msg
+ * @param findRouters the FindRouters request
*/
- private void receiveGetRouter(FindRouters msg) {
+ private void receiveGetRouter(final FindRouters findRouters) {
+ log.debug("receiveGetRouter for {}", findRouters.getRouteIdentifier());
+
+ final ActorRef sender = getSender();
+ if (!findRouters(findRouters, sender)) {
+ log.debug("No routers found for {} - scheduling {} ms timer", findRouters.getRouteIdentifier(),
+ findRouterTimeout.toMillis());
+
+ final AtomicReference<Cancellable> timer = new AtomicReference<>();
+ final Runnable routesUpdatedRunnable = new Runnable() {
+ @Override
+ public void run() {
+ if (findRouters(findRouters, sender)) {
+ routesUpdatedCallbacks.remove(this);
+ timer.get().cancel();
+ }
+ }
+ };
+
+ routesUpdatedCallbacks.add(routesUpdatedRunnable);
+
+ Runnable timerRunnable = () -> {
+ log.warn("Timed out finding routers for {}", findRouters.getRouteIdentifier());
+
+ routesUpdatedCallbacks.remove(routesUpdatedRunnable);
+ sender.tell(new Messages.FindRoutersReply(
+ Collections.<Pair<ActorRef, Long>>emptyList()), self());
+ };
+
+ timer.set(getContext().system().scheduler().scheduleOnce(findRouterTimeout, self(), timerRunnable,
+ getContext().dispatcher(), self()));
+ }
+ }
+
+ private boolean findRouters(FindRouters findRouters, ActorRef sender) {
List<Pair<ActorRef, Long>> routers = new ArrayList<>();
- RouteIdentifier<?, ?, ?> routeId = msg.getRouteIdentifier();
+ RouteIdentifier<?, ?, ?> routeId = findRouters.getRouteIdentifier();
findRoutes(getLocalBucket().getData(), routeId, routers);
- for(Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
+ for (Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
findRoutes(bucket.getData(), routeId, routers);
}
- getSender().tell(new Messages.FindRoutersReply(routers), getSelf());
+ log.debug("Found {} routers for {}", routers.size(), findRouters.getRouteIdentifier());
+
+ boolean foundRouters = !routers.isEmpty();
+ if (foundRouters) {
+ sender.tell(new Messages.FindRoutersReply(routers), getSelf());
+ }
+
+ return foundRouters;
}
private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier<?, ?, ?> routeId,
}
Option<Pair<ActorRef, Long>> routerWithUpdateTime = table.getRouterFor(routeId);
- if(!routerWithUpdateTime.isEmpty()) {
+ if (!routerWithUpdateTime.isEmpty()) {
routers.add(routerWithUpdateTime.get());
}
}
+ @Override
+ protected void onBucketsUpdated() {
+ if (routesUpdatedCallbacks.isEmpty()) {
+ return;
+ }
+
+ for (Runnable callBack: routesUpdatedCallbacks.toArray(new Runnable[routesUpdatedCallbacks.size()])) {
+ callBack.run();
+ }
+ }
+
/**
- * All messages used by the RpcRegistry
+ * All messages used by the RpcRegistry.
*/
public static class Messages {
final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
- Preconditions.checkArgument(routeIdentifiers != null &&
- !routeIdentifiers.isEmpty(),
- "Route Identifiers must be supplied");
+ Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(),
+ "Route Identifiers must be supplied");
this.routeIdentifiers = routeIdentifiers;
}
@Override
public String toString() {
- return "ContainsRoute{" +
- "routeIdentifiers=" + routeIdentifiers +
- '}';
+ return "ContainsRoute{" + "routeIdentifiers=" + routeIdentifiers + '}';
}
}
@Override
public String toString() {
- return "SetLocalRouter{" +
- "router=" + router +
- '}';
+ return "SetLocalRouter{" + "router=" + router + '}';
}
}
@Override
public String toString() {
- return "FindRouters{" +
- "routeIdentifier=" + routeIdentifier +
- '}';
+ return "FindRouters{" + "routeIdentifier=" + routeIdentifier + '}';
}
}
@Override
public String toString() {
- return "FindRoutersReply{" +
- "routerWithUpdateTime=" + routerWithUpdateTime +
- '}';
+ return "FindRoutersReply{" + "routerWithUpdateTime=" + routerWithUpdateTime + '}';
}
}
}