X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2FRpcRegistry.java;h=c8415cc818733ff67bcc9828eaf96eceac916f7c;hp=51609870cc4aad1c8789dfdcd0b68f04563b5cdf;hb=a884499896d6a146743272b08b4a7e504d9e8b9e;hpb=d7ce7c5acee7e6f7cd7895ceff5af63ac53789ad diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java index 51609870cc..c8415cc818 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java @@ -10,375 +10,175 @@ package org.opendaylight.controller.remote.rpc.registry; import akka.actor.ActorRef; import akka.actor.Address; import akka.actor.Props; -import akka.actor.UntypedActor; -import akka.dispatch.Mapper; -import akka.event.Logging; -import akka.event.LoggingAdapter; -import akka.japi.Option; -import akka.japi.Pair; -import akka.pattern.Patterns; import com.google.common.base.Preconditions; -import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; -import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore; -import org.opendaylight.controller.sal.connector.api.RpcRouter; -import scala.concurrent.Future; - +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import java.util.ArrayList; -import java.util.Collections; +import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; - -import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoute; -import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoute; -import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier; +import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; +import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints; +import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; +import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore; +import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; /** - * Registry to look up cluster nodes that have registered for a given rpc. + * Registry to look up cluster nodes that have registered for a given RPC. + * *

* It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this * cluster wide information. - * */ -public class RpcRegistry extends UntypedActor { - - final LoggingAdapter log = Logging.getLogger(getContext().system(), this); - - /** - * Store to keep the registry. Bucket store sync's it across nodes in the cluster - */ - private ActorRef bucketStore; - - /** - * Rpc broker that would use the registry to route requests. - */ - private ActorRef localRouter; +public class RpcRegistry extends BucketStore { + private final ActorRef rpcRegistrar; - public RpcRegistry() { - bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store"); - } - - public RpcRegistry(ActorRef bucketStore) { - this.bucketStore = bucketStore; - } - - @Override - public void onReceive(Object message) throws Exception { - - log.debug("Received message: message [{}]", message); - - //TODO: if sender is remote, reject message - - if (message instanceof SetLocalRouter) - receiveSetLocalRouter((SetLocalRouter) message); - - if (message instanceof AddOrUpdateRoute) - receiveAddRoute((AddOrUpdateRoute) message); - - else if (message instanceof RemoveRoute) - receiveRemoveRoute((RemoveRoute) message); - - else if (message instanceof Messages.FindRouters) - receiveGetRouter((Messages.FindRouters) message); - - else - unhandled(message); + public RpcRegistry(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) { + super(config, new RoutingTable(rpcInvoker)); + this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar); } /** - * Register's rpc broker + * Create a new props instance for instantiating an RpcRegistry actor. * - * @param message contains {@link akka.actor.ActorRef} for rpc broker - */ - private void receiveSetLocalRouter(SetLocalRouter message) { - if (message == null || message.getRouter() == null) - return;//ignore - - localRouter = message.getRouter(); - } - - /** - * //TODO: update this to accept multiple route registration - * @param msg + * @param config Provider configuration + * @param rpcRegistrar Local RPC provider interface, used to register routers to remote nodes + * @param rpcInvoker Actor handling RPC invocation requests from remote nodes + * @return A new {@link Props} instance */ - private void receiveAddRoute(AddOrUpdateRoute msg) { - if (msg.getRouteIdentifier() == null) - return;//ignore - - Preconditions.checkState(localRouter != null, "Router must be set first"); - - Future futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000); - futureReply.map(getMapperToAddRoute(msg.getRouteIdentifier()), getContext().dispatcher()); + public static Props props(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, + final ActorRef rpcRegistrar) { + return Props.create(RpcRegistry.class, config, rpcInvoker, rpcRegistrar); } - /** - * //TODO: update this to accept multiple routes - * @param msg - */ - private void receiveRemoveRoute(RemoveRoute msg) { - if (msg.getRouteIdentifier() == null) - return;//ignore - - Future futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000); - futureReply.map(getMapperToRemoveRoute(msg.getRouteIdentifier()), getContext().dispatcher()); - - } - - /** - * Finds routers for the given rpc. - * @param msg - */ - private void receiveGetRouter(Messages.FindRouters msg) { - final ActorRef sender = getSender(); - - //if empty message, return empty list - if (msg.getRouteIdentifier() == null) { - sender.tell(createEmptyReply(), getSelf()); - return; + @Override + protected void handleReceive(final Object message) throws Exception { + if (message instanceof AddOrUpdateRoutes) { + receiveAddRoutes((AddOrUpdateRoutes) message); + } else if (message instanceof RemoveRoutes) { + receiveRemoveRoutes((RemoveRoutes) message); + } else { + super.handleReceive(message); } + } - Future futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), 1000); - futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher()); - + private void receiveAddRoutes(final AddOrUpdateRoutes msg) { + LOG.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers()); + updateLocalBucket(getLocalBucket().getData().addRpcs(msg.getRouteIdentifiers())); } /** - * Helper to create empty reply when no routers are found + * Processes a RemoveRoutes message. * - * @return + * @param msg contains list of route ids to remove */ - private Messages.FindRoutersReply createEmptyReply() { - List> routerWithUpdateTime = Collections.emptyList(); - return new Messages.FindRoutersReply(routerWithUpdateTime); + private void receiveRemoveRoutes(final RemoveRoutes msg) { + LOG.debug("RemoveRoutes: {}", msg.getRouteIdentifiers()); + updateLocalBucket(getLocalBucket().getData().removeRpcs(msg.getRouteIdentifiers())); } - /** - * Helper to create a reply when routers are found for the given rpc - * @param buckets - * @param routeId - * @return - */ - private Messages.FindRoutersReply createReplyWithRouters(Map buckets, RpcRouter.RouteIdentifier routeId) { - - List> routers = new ArrayList<>(); - - Option> routerWithUpdateTime = null; - - for (Bucket bucket : buckets.values()) { - - RoutingTable table = (RoutingTable) bucket.getData(); - - if (table == null) - continue; - - routerWithUpdateTime = table.getRouterFor(routeId); - - if (routerWithUpdateTime.isEmpty()) - continue; - - routers.add(routerWithUpdateTime.get()); - } - - return new Messages.FindRoutersReply(routers); + @Override + protected void onBucketRemoved(final Address address, final Bucket bucket) { + rpcRegistrar.tell(new UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())), ActorRef.noSender()); } - - /// - ///private factories to create Mapper - /// - - /** - * Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found - * - * @param routeId the rpc - * @param sender client who asked to find the routers. - * @return - */ - private Mapper getMapperToGetRouter(final RpcRouter.RouteIdentifier routeId, final ActorRef sender) { - return new Mapper() { - @Override - public Void apply(Object replyMessage) { - - if (replyMessage instanceof GetAllBucketsReply) { - - GetAllBucketsReply reply = (GetAllBucketsReply) replyMessage; - Map buckets = reply.getBuckets(); - - if (buckets == null || buckets.isEmpty()) { - sender.tell(createEmptyReply(), getSelf()); - return null; - } - - sender.tell(createReplyWithRouters(buckets, routeId), getSelf()); + @Override + protected void onBucketsUpdated(final Map> buckets) { + final Map> endpoints = new HashMap<>(buckets.size()); + + for (Entry> e : buckets.entrySet()) { + final RoutingTable table = e.getValue().getData(); + + final List rpcs = new ArrayList<>(table.getRoutes().size()); + for (RouteIdentifier ri : table.getRoutes()) { + if (ri instanceof RouteIdentifierImpl) { + final RouteIdentifierImpl id = (RouteIdentifierImpl) ri; + rpcs.add(DOMRpcIdentifier.create(SchemaPath.create(true, id.getType()), id.getRoute())); + } else { + LOG.warn("Skipping unsupported route {} from {}", ri, e.getKey()); } - return null; } - }; - } - - /** - * Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently, - * it updates the local bucket in bucket store. - * - * @param routeId rpc to remote - * @return - */ - private Mapper getMapperToRemoveRoute(final RpcRouter.RouteIdentifier routeId) { - return new Mapper() { - @Override - public Void apply(Object replyMessage) { - if (replyMessage instanceof GetLocalBucketReply) { - - GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage; - Bucket bucket = reply.getBucket(); - - if (bucket == null) { - log.debug("Local bucket is null"); - return null; - } - - RoutingTable table = bucket.getData(); - if (table == null) - table = new RoutingTable(); - table.setRouter(localRouter); - table.removeRoute(routeId); - - bucket.setData(table); + endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty() + : Optional.of(new RemoteRpcEndpoint(table.getRpcInvoker(), rpcs))); + } - UpdateBucket updateBucketMessage = new UpdateBucket(bucket); - bucketStore.tell(updateBucketMessage, getSelf()); - } - return null; - } - }; + if (!endpoints.isEmpty()) { + rpcRegistrar.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender()); + } } - /** - * Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently, - * it updates the local bucket in bucket store. - * - * @param routeId rpc to add - * @return - */ - private Mapper getMapperToAddRoute(final RpcRouter.RouteIdentifier routeId) { - - return new Mapper() { - @Override - public Void apply(Object replyMessage) { - if (replyMessage instanceof GetLocalBucketReply) { - - GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage; - Bucket bucket = reply.getBucket(); + public static final class RemoteRpcEndpoint { + private final Set rpcs; + private final ActorRef router; - if (bucket == null) { - log.debug("Local bucket is null"); - return null; - } - - RoutingTable table = bucket.getData(); - if (table == null) - table = new RoutingTable(); - - table.setRouter(localRouter); - table.addRoute(routeId); - - bucket.setData(table); + RemoteRpcEndpoint(final ActorRef router, final Collection rpcs) { + this.router = Preconditions.checkNotNull(router); + this.rpcs = ImmutableSet.copyOf(rpcs); + } - UpdateBucket updateBucketMessage = new UpdateBucket(bucket); - bucketStore.tell(updateBucketMessage, getSelf()); - } + public ActorRef getRouter() { + return router; + } - return null; - } - }; + public Set getRpcs() { + return rpcs; + } } /** - * All messages used by the RpcRegistry + * All messages used by the RpcRegistry. */ public static class Messages { + abstract static class AbstractRouteMessage { + final List> routeIdentifiers; - - public static class ContainsRoute { - final RpcRouter.RouteIdentifier routeIdentifier; - - public ContainsRoute(RpcRouter.RouteIdentifier routeIdentifier) { - Preconditions.checkArgument(routeIdentifier != null); - this.routeIdentifier = routeIdentifier; + AbstractRouteMessage(final List> routeIdentifiers) { + Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(), + "Route Identifiers must be supplied"); + this.routeIdentifiers = routeIdentifiers; } - public RpcRouter.RouteIdentifier getRouteIdentifier(){ - return this.routeIdentifier; + List> getRouteIdentifiers() { + return this.routeIdentifiers; } @Override public String toString() { - return this.getClass().getSimpleName() + "{" + - "routeIdentifier=" + routeIdentifier + - '}'; + return "ContainsRoute{" + "routeIdentifiers=" + routeIdentifiers + '}'; } } - public static class AddOrUpdateRoute extends ContainsRoute{ - - public AddOrUpdateRoute(RpcRouter.RouteIdentifier routeIdentifier) { - super(routeIdentifier); + public static final class AddOrUpdateRoutes extends AbstractRouteMessage { + public AddOrUpdateRoutes(final List> routeIdentifiers) { + super(routeIdentifiers); } } - public static class RemoveRoute extends ContainsRoute { - - public RemoveRoute(RpcRouter.RouteIdentifier routeIdentifier) { - super(routeIdentifier); + public static final class RemoveRoutes extends AbstractRouteMessage { + public RemoveRoutes(final List> routeIdentifiers) { + super(routeIdentifiers); } } - public static class SetLocalRouter{ - private final ActorRef router; + public static final class UpdateRemoteEndpoints { + private final Map> endpoints; - public SetLocalRouter(ActorRef router) { - this.router = router; + UpdateRemoteEndpoints(final Map> endpoints) { + this.endpoints = ImmutableMap.copyOf(endpoints); } - public ActorRef getRouter(){ - return this.router; - } - - @Override - public String toString() { - return "SetLocalRouter{" + - "router=" + router + - '}'; - } - } - - public static class FindRouters extends ContainsRoute { - public FindRouters(RpcRouter.RouteIdentifier routeIdentifier) { - super(routeIdentifier); - } - } - - public static class FindRoutersReply { - final List> routerWithUpdateTime; - - public FindRoutersReply(List> routerWithUpdateTime) { - this.routerWithUpdateTime = routerWithUpdateTime; - } - - public List> getRouterWithUpdateTime(){ - return routerWithUpdateTime; - } - - @Override - public String toString() { - return "FindRoutersReply{" + - "routerWithUpdateTime=" + routerWithUpdateTime + - '}'; + public Map> getEndpoints() { + return endpoints; } } }