X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2FRpcRegistry.java;h=1545eb00d2cc9af4bda80881d197bad132f0e667;hb=2418a6052d7eba917d5972f0630cf746d22f690c;hp=fc5b618663988d304d6b51468d2da9f99250d7b2;hpb=88330d2f3ff048ab4e2e6f348ec3ea56e4c02cd4;p=controller.git diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java index fc5b618663..1545eb00d2 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java @@ -8,80 +8,72 @@ package org.opendaylight.controller.remote.rpc.registry; import akka.actor.ActorRef; -import akka.actor.Cancellable; +import akka.actor.Address; import akka.actor.Props; -import akka.japi.Option; -import akka.japi.Pair; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; +import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier; import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; +import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; -import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; -import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints; import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore; import org.opendaylight.controller.sal.connector.api.RpcRouter; import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier; -import scala.concurrent.duration.FiniteDuration; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; /** - * Registry to look up cluster nodes that have registered for a given rpc. + * Registry to look up cluster nodes that have registered for a given RPC. * *

* It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this * cluster wide information. */ public class RpcRegistry extends BucketStore { - private final Set routesUpdatedCallbacks = new HashSet<>(); - private final FiniteDuration findRouterTimeout; + private final ActorRef rpcRegistrar; - public RpcRegistry(RemoteRpcProviderConfig config) { - super(config); - getLocalBucket().setData(new RoutingTable()); - findRouterTimeout = getConfig().getGossipTickInterval().$times(10); + public RpcRegistry(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) { + super(config, new RoutingTable(rpcInvoker)); + this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar); } - public static Props props(RemoteRpcProviderConfig config) { - return Props.create(RpcRegistry.class, config); + /** + * Create a new props instance for instantiating an RpcRegistry actor. + * + * @param config Provider configuration + * @param rpcRegistrar Local RPC provider interface, used to register routers to remote nodes + * @param rpcInvoker Actor handling RPC invocation requests from remote nodes + * @return A new {@link Props} instance + */ + public static Props props(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, + final ActorRef rpcRegistrar) { + return Props.create(RpcRegistry.class, config, rpcInvoker, rpcRegistrar); } @Override - protected void handleReceive(Object message) throws Exception { - //TODO: if sender is remote, reject message - - if (message instanceof SetLocalRouter) { - receiveSetLocalRouter((SetLocalRouter) message); - } else if (message instanceof AddOrUpdateRoutes) { + protected void handleReceive(final Object message) throws Exception { + if (message instanceof AddOrUpdateRoutes) { receiveAddRoutes((AddOrUpdateRoutes) message); } else if (message instanceof RemoveRoutes) { receiveRemoveRoutes((RemoveRoutes) message); - } else if (message instanceof Messages.FindRouters) { - receiveGetRouter((FindRouters) message); - } else if (message instanceof Runnable) { - ((Runnable)message).run(); } else { super.handleReceive(message); } } - /** - * Registers a rpc broker. - * - * @param message contains {@link akka.actor.ActorRef} for rpc broker - */ - private void receiveSetLocalRouter(SetLocalRouter message) { - getLocalBucket().getData().setRouter(message.getRouter()); - } - - private void receiveAddRoutes(AddOrUpdateRoutes msg) { - - log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers()); + private void receiveAddRoutes(final AddOrUpdateRoutes msg) { + LOG.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers()); RoutingTable table = getLocalBucket().getData().copy(); for (RpcRouter.RouteIdentifier routeId : msg.getRouteIdentifiers()) { @@ -89,8 +81,6 @@ public class RpcRegistry extends BucketStore { } updateLocalBucket(table); - - onBucketsUpdated(); } /** @@ -98,8 +88,7 @@ public class RpcRegistry extends BucketStore { * * @param msg contains list of route ids to remove */ - private void receiveRemoveRoutes(RemoveRoutes msg) { - + private void receiveRemoveRoutes(final RemoveRoutes msg) { RoutingTable table = getLocalBucket().getData().copy(); for (RpcRouter.RouteIdentifier routeId : msg.getRouteIdentifiers()) { table.removeRoute(routeId); @@ -108,85 +97,52 @@ public class RpcRegistry extends BucketStore { updateLocalBucket(table); } - /** - * Finds routers for the given rpc. - * - * @param findRouters the FindRouters request - */ - private void receiveGetRouter(final FindRouters findRouters) { - log.debug("receiveGetRouter for {}", findRouters.getRouteIdentifier()); - - final ActorRef sender = getSender(); - if (!findRouters(findRouters, sender)) { - log.debug("No routers found for {} - scheduling {} ms timer", findRouters.getRouteIdentifier(), - findRouterTimeout.toMillis()); - - final AtomicReference timer = new AtomicReference<>(); - final Runnable routesUpdatedRunnable = new Runnable() { - @Override - public void run() { - if (findRouters(findRouters, sender)) { - routesUpdatedCallbacks.remove(this); - timer.get().cancel(); - } - } - }; - - routesUpdatedCallbacks.add(routesUpdatedRunnable); - - Runnable timerRunnable = () -> { - log.warn("Timed out finding routers for {}", findRouters.getRouteIdentifier()); - - routesUpdatedCallbacks.remove(routesUpdatedRunnable); - sender.tell(new Messages.FindRoutersReply( - Collections.>emptyList()), self()); - }; - - timer.set(getContext().system().scheduler().scheduleOnce(findRouterTimeout, self(), timerRunnable, - getContext().dispatcher(), self())); - } + @Override + protected void onBucketRemoved(final Address address, final Bucket bucket) { + rpcRegistrar.tell(new UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())), ActorRef.noSender()); } - private boolean findRouters(FindRouters findRouters, ActorRef sender) { - List> routers = new ArrayList<>(); - - RouteIdentifier routeId = findRouters.getRouteIdentifier(); - findRoutes(getLocalBucket().getData(), routeId, routers); + @Override + protected void onBucketsUpdated(final Map> buckets) { + final Map> endpoints = new HashMap<>(buckets.size()); + + for (Entry> e : buckets.entrySet()) { + final RoutingTable table = e.getValue().getData(); + + final List rpcs = new ArrayList<>(table.getRoutes().size()); + for (RouteIdentifier ri : table.getRoutes()) { + if (ri instanceof RouteIdentifierImpl) { + final RouteIdentifierImpl id = (RouteIdentifierImpl) ri; + rpcs.add(DOMRpcIdentifier.create(SchemaPath.create(true, id.getType()), id.getRoute())); + } else { + LOG.warn("Skipping unsupported route {} from {}", ri, e.getKey()); + } + } - for (Bucket bucket : getRemoteBuckets().values()) { - findRoutes(bucket.getData(), routeId, routers); + endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty() + : Optional.of(new RemoteRpcEndpoint(table.getRouter(), rpcs))); } - log.debug("Found {} routers for {}", routers.size(), findRouters.getRouteIdentifier()); - - boolean foundRouters = !routers.isEmpty(); - if (foundRouters) { - sender.tell(new Messages.FindRoutersReply(routers), getSelf()); + if (!endpoints.isEmpty()) { + rpcRegistrar.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender()); } - - return foundRouters; } - private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier routeId, - List> routers) { - if (table == null) { - return; - } + public static final class RemoteRpcEndpoint { + private final Set rpcs; + private final ActorRef router; - Option> routerWithUpdateTime = table.getRouterFor(routeId); - if (!routerWithUpdateTime.isEmpty()) { - routers.add(routerWithUpdateTime.get()); + RemoteRpcEndpoint(final ActorRef router, final Collection rpcs) { + this.router = Preconditions.checkNotNull(router); + this.rpcs = ImmutableSet.copyOf(rpcs); } - } - @Override - protected void onBucketsUpdated() { - if (routesUpdatedCallbacks.isEmpty()) { - return; + public ActorRef getRouter() { + return router; } - for (Runnable callBack: routesUpdatedCallbacks.toArray(new Runnable[routesUpdatedCallbacks.size()])) { - callBack.run(); + public Set getRpcs() { + return rpcs; } } @@ -194,18 +150,16 @@ public class RpcRegistry extends BucketStore { * All messages used by the RpcRegistry. */ public static class Messages { - - - public static class ContainsRoute { + abstract static class AbstractRouteMessage { final List> routeIdentifiers; - public ContainsRoute(List> routeIdentifiers) { + AbstractRouteMessage(final List> routeIdentifiers) { Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(), "Route Identifiers must be supplied"); this.routeIdentifiers = routeIdentifiers; } - public List> getRouteIdentifiers() { + List> getRouteIdentifiers() { return this.routeIdentifiers; } @@ -215,71 +169,27 @@ public class RpcRegistry extends BucketStore { } } - public static class AddOrUpdateRoutes extends ContainsRoute { - - public AddOrUpdateRoutes(List> routeIdentifiers) { + public static final class AddOrUpdateRoutes extends AbstractRouteMessage { + public AddOrUpdateRoutes(final List> routeIdentifiers) { super(routeIdentifiers); } } - public static class RemoveRoutes extends ContainsRoute { - - public RemoveRoutes(List> routeIdentifiers) { + public static final class RemoveRoutes extends AbstractRouteMessage { + public RemoveRoutes(final List> routeIdentifiers) { super(routeIdentifiers); } } - public static class SetLocalRouter { - private final ActorRef router; - - public SetLocalRouter(ActorRef router) { - Preconditions.checkArgument(router != null, "Router must not be null"); - this.router = router; - } - - public ActorRef getRouter() { - return this.router; - } - - @Override - public String toString() { - return "SetLocalRouter{" + "router=" + router + '}'; - } - } - - public static class FindRouters { - private final RpcRouter.RouteIdentifier routeIdentifier; - - public FindRouters(RpcRouter.RouteIdentifier routeIdentifier) { - Preconditions.checkArgument(routeIdentifier != null, "Route must not be null"); - this.routeIdentifier = routeIdentifier; - } - - public RpcRouter.RouteIdentifier getRouteIdentifier() { - return routeIdentifier; - } + public static final class UpdateRemoteEndpoints { + private final Map> endpoints; - @Override - public String toString() { - return "FindRouters{" + "routeIdentifier=" + routeIdentifier + '}'; + UpdateRemoteEndpoints(final Map> endpoints) { + this.endpoints = ImmutableMap.copyOf(endpoints); } - } - public static class FindRoutersReply { - final List> routerWithUpdateTime; - - public FindRoutersReply(List> routerWithUpdateTime) { - Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null"); - this.routerWithUpdateTime = routerWithUpdateTime; - } - - public List> getRouterWithUpdateTime() { - return routerWithUpdateTime; - } - - @Override - public String toString() { - return "FindRoutersReply{" + "routerWithUpdateTime=" + routerWithUpdateTime + '}'; + public Map> getEndpoints() { + return endpoints; } } }