X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2FRpcRegistry.java;h=fc5b618663988d304d6b51468d2da9f99250d7b2;hp=e2ebcb2b25a62c3f60232db52e90749736561948;hb=b78ee4d6b08e2cc0cf5edd01af0e54c3bf619ab5;hpb=f3bc7a6b7d0326e5594604cdc144b967c2a9cdb4 diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java index e2ebcb2b25..fc5b618663 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java @@ -8,287 +8,190 @@ package org.opendaylight.controller.remote.rpc.registry; import akka.actor.ActorRef; -import akka.actor.Address; +import akka.actor.Cancellable; import akka.actor.Props; -import akka.actor.UntypedActor; -import akka.dispatch.Mapper; -import akka.event.Logging; -import akka.event.LoggingAdapter; import akka.japi.Option; import akka.japi.Pair; -import akka.pattern.Patterns; import com.google.common.base.Preconditions; -import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; -import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore; -import org.opendaylight.controller.sal.connector.api.RpcRouter; -import scala.concurrent.Future; - import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; -import java.util.Map; - -import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; -import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; -import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter; -import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter; +import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; +import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore; +import org.opendaylight.controller.sal.connector.api.RpcRouter; +import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier; +import scala.concurrent.duration.FiniteDuration; /** * Registry to look up cluster nodes that have registered for a given rpc. - *

+ * + *

* It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this * cluster wide information. */ -public class RpcRegistry extends UntypedActor { - - final LoggingAdapter log = Logging.getLogger(getContext().system(), this); - - /** - * Store to keep the registry. Bucket store sync's it across nodes in the cluster - */ - private ActorRef bucketStore; - - /** - * Rpc broker that would use the registry to route requests. - */ - private ActorRef localRouter; - - public RpcRegistry() { - bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store"); +public class RpcRegistry extends BucketStore { + private final Set routesUpdatedCallbacks = new HashSet<>(); + private final FiniteDuration findRouterTimeout; + + public RpcRegistry(RemoteRpcProviderConfig config) { + super(config); + getLocalBucket().setData(new RoutingTable()); + findRouterTimeout = getConfig().getGossipTickInterval().$times(10); } - public RpcRegistry(ActorRef bucketStore) { - this.bucketStore = bucketStore; + public static Props props(RemoteRpcProviderConfig config) { + return Props.create(RpcRegistry.class, config); } @Override - public void onReceive(Object message) throws Exception { - - log.debug("Received message: message [{}]", message); - + protected void handleReceive(Object message) throws Exception { //TODO: if sender is remote, reject message - if (message instanceof SetLocalRouter) + if (message instanceof SetLocalRouter) { receiveSetLocalRouter((SetLocalRouter) message); - - if (message instanceof AddOrUpdateRoutes) + } else if (message instanceof AddOrUpdateRoutes) { receiveAddRoutes((AddOrUpdateRoutes) message); - - else if (message instanceof RemoveRoutes) + } else if (message instanceof RemoveRoutes) { receiveRemoveRoutes((RemoveRoutes) message); - - else if (message instanceof Messages.FindRouters) + } else if (message instanceof Messages.FindRouters) { receiveGetRouter((FindRouters) message); - - else - unhandled(message); + } else if (message instanceof Runnable) { + ((Runnable)message).run(); + } else { + super.handleReceive(message); + } } /** - * Register's rpc broker + * Registers a rpc broker. * * @param message contains {@link akka.actor.ActorRef} for rpc broker */ private void receiveSetLocalRouter(SetLocalRouter message) { - localRouter = message.getRouter(); + getLocalBucket().getData().setRouter(message.getRouter()); } - /** - * @param msg - */ private void receiveAddRoutes(AddOrUpdateRoutes msg) { - Preconditions.checkState(localRouter != null, "Router must be set first"); - - Future futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000); - futureReply.map(getMapperToAddRoutes(msg.getRouteIdentifiers()), getContext().dispatcher()); - } + log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers()); - /** - * @param msg contains list of route ids to remove - */ - private void receiveRemoveRoutes(RemoveRoutes msg) { + RoutingTable table = getLocalBucket().getData().copy(); + for (RpcRouter.RouteIdentifier routeId : msg.getRouteIdentifiers()) { + table.addRoute(routeId); + } - Future futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000); - futureReply.map(getMapperToRemoveRoutes(msg.getRouteIdentifiers()), getContext().dispatcher()); + updateLocalBucket(table); + onBucketsUpdated(); } /** - * Finds routers for the given rpc. + * Processes a RemoveRoutes message. * - * @param msg + * @param msg contains list of route ids to remove */ - private void receiveGetRouter(FindRouters msg) { - final ActorRef sender = getSender(); + private void receiveRemoveRoutes(RemoveRoutes msg) { - Future futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), 1000); - futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher()); - } + RoutingTable table = getLocalBucket().getData().copy(); + for (RpcRouter.RouteIdentifier routeId : msg.getRouteIdentifiers()) { + table.removeRoute(routeId); + } - /** - * Helper to create empty reply when no routers are found - * - * @return - */ - private Messages.FindRoutersReply createEmptyReply() { - List> routerWithUpdateTime = Collections.emptyList(); - return new Messages.FindRoutersReply(routerWithUpdateTime); + updateLocalBucket(table); } /** - * Helper to create a reply when routers are found for the given rpc + * Finds routers for the given rpc. * - * @param buckets - * @param routeId - * @return + * @param findRouters the FindRouters request */ - private Messages.FindRoutersReply createReplyWithRouters(Map buckets, RpcRouter.RouteIdentifier routeId) { + private void receiveGetRouter(final FindRouters findRouters) { + log.debug("receiveGetRouter for {}", findRouters.getRouteIdentifier()); - List> routers = new ArrayList<>(); - Option> routerWithUpdateTime = null; + final ActorRef sender = getSender(); + if (!findRouters(findRouters, sender)) { + log.debug("No routers found for {} - scheduling {} ms timer", findRouters.getRouteIdentifier(), + findRouterTimeout.toMillis()); + + final AtomicReference timer = new AtomicReference<>(); + final Runnable routesUpdatedRunnable = new Runnable() { + @Override + public void run() { + if (findRouters(findRouters, sender)) { + routesUpdatedCallbacks.remove(this); + timer.get().cancel(); + } + } + }; - for (Bucket bucket : buckets.values()) { + routesUpdatedCallbacks.add(routesUpdatedRunnable); - RoutingTable table = (RoutingTable) bucket.getData(); - if (table == null) - continue; + Runnable timerRunnable = () -> { + log.warn("Timed out finding routers for {}", findRouters.getRouteIdentifier()); - routerWithUpdateTime = table.getRouterFor(routeId); - if (routerWithUpdateTime.isEmpty()) - continue; + routesUpdatedCallbacks.remove(routesUpdatedRunnable); + sender.tell(new Messages.FindRoutersReply( + Collections.>emptyList()), self()); + }; - routers.add(routerWithUpdateTime.get()); + timer.set(getContext().system().scheduler().scheduleOnce(findRouterTimeout, self(), timerRunnable, + getContext().dispatcher(), self())); } - - return new Messages.FindRoutersReply(routers); } + private boolean findRouters(FindRouters findRouters, ActorRef sender) { + List> routers = new ArrayList<>(); - /// - ///private factories to create Mapper - /// - - /** - * Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found - * - * @param routeId the rpc - * @param sender client who asked to find the routers. - * @return - */ - private Mapper getMapperToGetRouter(final RpcRouter.RouteIdentifier routeId, final ActorRef sender) { - return new Mapper() { - @Override - public Void apply(Object replyMessage) { + RouteIdentifier routeId = findRouters.getRouteIdentifier(); + findRoutes(getLocalBucket().getData(), routeId, routers); - if (replyMessage instanceof GetAllBucketsReply) { + for (Bucket bucket : getRemoteBuckets().values()) { + findRoutes(bucket.getData(), routeId, routers); + } - GetAllBucketsReply reply = (GetAllBucketsReply) replyMessage; - Map buckets = reply.getBuckets(); + log.debug("Found {} routers for {}", routers.size(), findRouters.getRouteIdentifier()); - if (buckets == null || buckets.isEmpty()) { - sender.tell(createEmptyReply(), getSelf()); - return null; - } + boolean foundRouters = !routers.isEmpty(); + if (foundRouters) { + sender.tell(new Messages.FindRoutersReply(routers), getSelf()); + } - sender.tell(createReplyWithRouters(buckets, routeId), getSelf()); - } - return null; - } - }; + return foundRouters; } - /** - * Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently, - * it updates the local bucket in bucket store. - * - * @param routeIds rpc to remote - * @return - */ - private Mapper getMapperToRemoveRoutes(final List> routeIds) { - return new Mapper() { - @Override - public Void apply(Object replyMessage) { - if (replyMessage instanceof GetLocalBucketReply) { - - GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage; - Bucket bucket = reply.getBucket(); - - if (bucket == null) { - log.debug("Local bucket is null"); - return null; - } - - RoutingTable table = bucket.getData(); - if (table == null) - table = new RoutingTable(); - - table.setRouter(localRouter); - - if (!table.isEmpty()) { - for (RpcRouter.RouteIdentifier routeId : routeIds) { - table.removeRoute(routeId); - } - } - bucket.setData(table); + private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier routeId, + List> routers) { + if (table == null) { + return; + } - UpdateBucket updateBucketMessage = new UpdateBucket(bucket); - bucketStore.tell(updateBucketMessage, getSelf()); - } - return null; - } - }; + Option> routerWithUpdateTime = table.getRouterFor(routeId); + if (!routerWithUpdateTime.isEmpty()) { + routers.add(routerWithUpdateTime.get()); + } } - /** - * Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently, - * it updates the local bucket in bucket store. - * - * @param routeIds rpc to add - * @return - */ - private Mapper getMapperToAddRoutes(final List> routeIds) { - - return new Mapper() { - @Override - public Void apply(Object replyMessage) { - if (replyMessage instanceof GetLocalBucketReply) { - - GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage; - Bucket bucket = reply.getBucket(); - - if (bucket == null) { - log.debug("Local bucket is null"); - return null; - } - - RoutingTable table = bucket.getData(); - if (table == null) - table = new RoutingTable(); - - table.setRouter(localRouter); - for (RpcRouter.RouteIdentifier routeId : routeIds) { - table.addRoute(routeId); - } - - bucket.setData(table); - - UpdateBucket updateBucketMessage = new UpdateBucket(bucket); - bucketStore.tell(updateBucketMessage, getSelf()); - } + @Override + protected void onBucketsUpdated() { + if (routesUpdatedCallbacks.isEmpty()) { + return; + } - return null; - } - }; + for (Runnable callBack: routesUpdatedCallbacks.toArray(new Runnable[routesUpdatedCallbacks.size()])) { + callBack.run(); + } } /** - * All messages used by the RpcRegistry + * All messages used by the RpcRegistry. */ public static class Messages { @@ -297,9 +200,8 @@ public class RpcRegistry extends UntypedActor { final List> routeIdentifiers; public ContainsRoute(List> routeIdentifiers) { - Preconditions.checkArgument(routeIdentifiers != null && - !routeIdentifiers.isEmpty(), - "Route Identifiers must be supplied"); + Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(), + "Route Identifiers must be supplied"); this.routeIdentifiers = routeIdentifiers; } @@ -309,9 +211,7 @@ public class RpcRegistry extends UntypedActor { @Override public String toString() { - return "ContainsRoute{" + - "routeIdentifiers=" + routeIdentifiers + - '}'; + return "ContainsRoute{" + "routeIdentifiers=" + routeIdentifiers + '}'; } } @@ -343,9 +243,7 @@ public class RpcRegistry extends UntypedActor { @Override public String toString() { - return "SetLocalRouter{" + - "router=" + router + - '}'; + return "SetLocalRouter{" + "router=" + router + '}'; } } @@ -363,9 +261,7 @@ public class RpcRegistry extends UntypedActor { @Override public String toString() { - return "FindRouters{" + - "routeIdentifier=" + routeIdentifier + - '}'; + return "FindRouters{" + "routeIdentifier=" + routeIdentifier + '}'; } } @@ -383,9 +279,7 @@ public class RpcRegistry extends UntypedActor { @Override public String toString() { - return "FindRoutersReply{" + - "routerWithUpdateTime=" + routerWithUpdateTime + - '}'; + return "FindRoutersReply{" + "routerWithUpdateTime=" + routerWithUpdateTime + '}'; } } }