X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2FRpcRegistry.java;h=fc5b618663988d304d6b51468d2da9f99250d7b2;hp=845c1c819a70ca6bac0fb1a717b31f7861b6a6b6;hb=b78ee4d6b08e2cc0cf5edd01af0e54c3bf619ab5;hpb=1e80b656857bf829d8ae3cae21b0b726190b96ea diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java index 845c1c819a..fc5b618663 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java @@ -8,13 +8,18 @@ package org.opendaylight.controller.remote.rpc.registry; import akka.actor.ActorRef; -import akka.event.Logging; -import akka.event.LoggingAdapter; +import akka.actor.Cancellable; +import akka.actor.Props; import akka.japi.Option; import akka.japi.Pair; import com.google.common.base.Preconditions; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; @@ -23,19 +28,27 @@ import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore; import org.opendaylight.controller.sal.connector.api.RpcRouter; import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier; +import scala.concurrent.duration.FiniteDuration; /** * Registry to look up cluster nodes that have registered for a given rpc. - *

+ * + *

* It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this * cluster wide information. */ public class RpcRegistry extends BucketStore { + private final Set routesUpdatedCallbacks = new HashSet<>(); + private final FiniteDuration findRouterTimeout; - final LoggingAdapter log = Logging.getLogger(getContext().system(), this); - - public RpcRegistry() { + public RpcRegistry(RemoteRpcProviderConfig config) { + super(config); getLocalBucket().setData(new RoutingTable()); + findRouterTimeout = getConfig().getGossipTickInterval().$times(10); + } + + public static Props props(RemoteRpcProviderConfig config) { + return Props.create(RpcRegistry.class, config); } @Override @@ -50,13 +63,15 @@ public class RpcRegistry extends BucketStore { receiveRemoveRoutes((RemoveRoutes) message); } else if (message instanceof Messages.FindRouters) { receiveGetRouter((FindRouters) message); + } else if (message instanceof Runnable) { + ((Runnable)message).run(); } else { super.handleReceive(message); } } /** - * Register's rpc broker + * Registers a rpc broker. * * @param message contains {@link akka.actor.ActorRef} for rpc broker */ @@ -64,22 +79,23 @@ public class RpcRegistry extends BucketStore { getLocalBucket().getData().setRouter(message.getRouter()); } - /** - * @param msg - */ private void receiveAddRoutes(AddOrUpdateRoutes msg) { log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers()); RoutingTable table = getLocalBucket().getData().copy(); - for(RpcRouter.RouteIdentifier routeId : msg.getRouteIdentifiers()) { + for (RpcRouter.RouteIdentifier routeId : msg.getRouteIdentifiers()) { table.addRoute(routeId); } updateLocalBucket(table); + + onBucketsUpdated(); } /** + * Processes a RemoveRoutes message. + * * @param msg contains list of route ids to remove */ private void receiveRemoveRoutes(RemoveRoutes msg) { @@ -95,19 +111,60 @@ public class RpcRegistry extends BucketStore { /** * Finds routers for the given rpc. * - * @param msg + * @param findRouters the FindRouters request */ - private void receiveGetRouter(FindRouters msg) { + private void receiveGetRouter(final FindRouters findRouters) { + log.debug("receiveGetRouter for {}", findRouters.getRouteIdentifier()); + + final ActorRef sender = getSender(); + if (!findRouters(findRouters, sender)) { + log.debug("No routers found for {} - scheduling {} ms timer", findRouters.getRouteIdentifier(), + findRouterTimeout.toMillis()); + + final AtomicReference timer = new AtomicReference<>(); + final Runnable routesUpdatedRunnable = new Runnable() { + @Override + public void run() { + if (findRouters(findRouters, sender)) { + routesUpdatedCallbacks.remove(this); + timer.get().cancel(); + } + } + }; + + routesUpdatedCallbacks.add(routesUpdatedRunnable); + + Runnable timerRunnable = () -> { + log.warn("Timed out finding routers for {}", findRouters.getRouteIdentifier()); + + routesUpdatedCallbacks.remove(routesUpdatedRunnable); + sender.tell(new Messages.FindRoutersReply( + Collections.>emptyList()), self()); + }; + + timer.set(getContext().system().scheduler().scheduleOnce(findRouterTimeout, self(), timerRunnable, + getContext().dispatcher(), self())); + } + } + + private boolean findRouters(FindRouters findRouters, ActorRef sender) { List> routers = new ArrayList<>(); - RouteIdentifier routeId = msg.getRouteIdentifier(); + RouteIdentifier routeId = findRouters.getRouteIdentifier(); findRoutes(getLocalBucket().getData(), routeId, routers); - for(Bucket bucket : getRemoteBuckets().values()) { + for (Bucket bucket : getRemoteBuckets().values()) { findRoutes(bucket.getData(), routeId, routers); } - getSender().tell(new Messages.FindRoutersReply(routers), getSelf()); + log.debug("Found {} routers for {}", routers.size(), findRouters.getRouteIdentifier()); + + boolean foundRouters = !routers.isEmpty(); + if (foundRouters) { + sender.tell(new Messages.FindRoutersReply(routers), getSelf()); + } + + return foundRouters; } private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier routeId, @@ -117,13 +174,24 @@ public class RpcRegistry extends BucketStore { } Option> routerWithUpdateTime = table.getRouterFor(routeId); - if(!routerWithUpdateTime.isEmpty()) { + if (!routerWithUpdateTime.isEmpty()) { routers.add(routerWithUpdateTime.get()); } } + @Override + protected void onBucketsUpdated() { + if (routesUpdatedCallbacks.isEmpty()) { + return; + } + + for (Runnable callBack: routesUpdatedCallbacks.toArray(new Runnable[routesUpdatedCallbacks.size()])) { + callBack.run(); + } + } + /** - * All messages used by the RpcRegistry + * All messages used by the RpcRegistry. */ public static class Messages { @@ -132,9 +200,8 @@ public class RpcRegistry extends BucketStore { final List> routeIdentifiers; public ContainsRoute(List> routeIdentifiers) { - Preconditions.checkArgument(routeIdentifiers != null && - !routeIdentifiers.isEmpty(), - "Route Identifiers must be supplied"); + Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(), + "Route Identifiers must be supplied"); this.routeIdentifiers = routeIdentifiers; } @@ -144,9 +211,7 @@ public class RpcRegistry extends BucketStore { @Override public String toString() { - return "ContainsRoute{" + - "routeIdentifiers=" + routeIdentifiers + - '}'; + return "ContainsRoute{" + "routeIdentifiers=" + routeIdentifiers + '}'; } } @@ -178,9 +243,7 @@ public class RpcRegistry extends BucketStore { @Override public String toString() { - return "SetLocalRouter{" + - "router=" + router + - '}'; + return "SetLocalRouter{" + "router=" + router + '}'; } } @@ -198,9 +261,7 @@ public class RpcRegistry extends BucketStore { @Override public String toString() { - return "FindRouters{" + - "routeIdentifier=" + routeIdentifier + - '}'; + return "FindRouters{" + "routeIdentifier=" + routeIdentifier + '}'; } } @@ -218,9 +279,7 @@ public class RpcRegistry extends BucketStore { @Override public String toString() { - return "FindRoutersReply{" + - "routerWithUpdateTime=" + routerWithUpdateTime + - '}'; + return "FindRoutersReply{" + "routerWithUpdateTime=" + routerWithUpdateTime + '}'; } } }