X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2FRpcRegistry.java;h=b467ce949ae4a86e9500fffa9f05b85c0ba407e1;hp=845c1c819a70ca6bac0fb1a717b31f7861b6a6b6;hb=92ce52ab3df561a2a07bf56c7115123b0825449e;hpb=71c540b3572415aef56acd4a31b503f24e9da437 diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java index 845c1c819a..b467ce949a 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java @@ -8,21 +8,30 @@ package org.opendaylight.controller.remote.rpc.registry; import akka.actor.ActorRef; -import akka.event.Logging; -import akka.event.LoggingAdapter; +import akka.actor.Cancellable; +import akka.actor.Props; +import akka.japi.Creator; import akka.japi.Option; import akka.japi.Pair; import com.google.common.base.Preconditions; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter; import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore; +import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBean; +import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl; import org.opendaylight.controller.sal.connector.api.RpcRouter; import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier; +import scala.concurrent.duration.FiniteDuration; /** * Registry to look up cluster nodes that have registered for a given rpc. @@ -31,11 +40,17 @@ import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier; * cluster wide information. */ public class RpcRegistry extends BucketStore { + private final Set routesUpdatedCallbacks = new HashSet<>(); + private final FiniteDuration findRouterTimeout; - final LoggingAdapter log = Logging.getLogger(getContext().system(), this); - - public RpcRegistry() { + public RpcRegistry(RemoteRpcProviderConfig config) { + super(config); getLocalBucket().setData(new RoutingTable()); + findRouterTimeout = getConfig().getGossipTickInterval().$times(10); + } + + public static Props props(RemoteRpcProviderConfig config) { + return Props.create(new RpcRegistryCreator(config)); } @Override @@ -50,6 +65,8 @@ public class RpcRegistry extends BucketStore { receiveRemoveRoutes((RemoveRoutes) message); } else if (message instanceof Messages.FindRouters) { receiveGetRouter((FindRouters) message); + } else if (message instanceof Runnable) { + ((Runnable)message).run(); } else { super.handleReceive(message); } @@ -77,6 +94,8 @@ public class RpcRegistry extends BucketStore { } updateLocalBucket(table); + + onBucketsUpdated(); } /** @@ -95,19 +114,63 @@ public class RpcRegistry extends BucketStore { /** * Finds routers for the given rpc. * - * @param msg + * @param findRouters */ - private void receiveGetRouter(FindRouters msg) { + private void receiveGetRouter(final FindRouters findRouters) { + log.debug("receiveGetRouter for {}", findRouters.getRouteIdentifier()); + + final ActorRef sender = getSender(); + if(!findRouters(findRouters, sender)) { + log.debug("No routers found for {} - scheduling {} ms timer", findRouters.getRouteIdentifier(), + findRouterTimeout.toMillis()); + + final AtomicReference timer = new AtomicReference<>(); + final Runnable routesUpdatedRunnable = new Runnable() { + @Override + public void run() { + if(findRouters(findRouters, sender)) { + routesUpdatedCallbacks.remove(this); + timer.get().cancel(); + } + } + }; + + routesUpdatedCallbacks.add(routesUpdatedRunnable); + + Runnable timerRunnable = new Runnable() { + @Override + public void run() { + log.warn("Timed out finding routers for {}", findRouters.getRouteIdentifier()); + + routesUpdatedCallbacks.remove(routesUpdatedRunnable); + sender.tell(new Messages.FindRoutersReply( + Collections.>emptyList()), self()); + } + }; + + timer.set(getContext().system().scheduler().scheduleOnce(findRouterTimeout, self(), timerRunnable, + getContext().dispatcher(), self())); + } + } + + private boolean findRouters(FindRouters findRouters, ActorRef sender) { List> routers = new ArrayList<>(); - RouteIdentifier routeId = msg.getRouteIdentifier(); + RouteIdentifier routeId = findRouters.getRouteIdentifier(); findRoutes(getLocalBucket().getData(), routeId, routers); for(Bucket bucket : getRemoteBuckets().values()) { findRoutes(bucket.getData(), routeId, routers); } - getSender().tell(new Messages.FindRoutersReply(routers), getSelf()); + log.debug("Found {} routers for {}", routers.size(), findRouters.getRouteIdentifier()); + + boolean foundRouters = !routers.isEmpty(); + if(foundRouters) { + sender.tell(new Messages.FindRoutersReply(routers), getSelf()); + } + + return foundRouters; } private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier routeId, @@ -122,6 +185,13 @@ public class RpcRegistry extends BucketStore { } } + @Override + protected void onBucketsUpdated() { + for(Runnable callBack: routesUpdatedCallbacks) { + callBack.run(); + } + } + /** * All messages used by the RpcRegistry */ @@ -224,4 +294,20 @@ public class RpcRegistry extends BucketStore { } } } + + private static class RpcRegistryCreator implements Creator { + private static final long serialVersionUID = 1L; + private final RemoteRpcProviderConfig config; + + private RpcRegistryCreator(RemoteRpcProviderConfig config) { + this.config = config; + } + + @Override + public RpcRegistry create() throws Exception { + RpcRegistry registry = new RpcRegistry(config); + RemoteRpcRegistryMXBean mxBean = new RemoteRpcRegistryMXBeanImpl(registry); + return registry; + } + } }