X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2FRpcRegistry.java;h=2c89f1426072a6e945de5ed107fd3b07a53ef7df;hp=845c1c819a70ca6bac0fb1a717b31f7861b6a6b6;hb=HEAD;hpb=d58f9f17f0f552105fa2c01960991f484b51e733 diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java index 845c1c819a..8d66ed8ccb 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java @@ -7,220 +7,193 @@ */ package org.opendaylight.controller.remote.rpc.registry; +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorRef; -import akka.event.Logging; -import akka.event.LoggingAdapter; -import akka.japi.Option; -import akka.japi.Pair; -import com.google.common.base.Preconditions; -import java.util.ArrayList; +import akka.actor.Address; +import akka.actor.Props; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; -import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; -import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter; import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; -import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore; -import org.opendaylight.controller.sal.connector.api.RpcRouter; -import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier; +import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess; +import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor; +import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl; +import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier; /** - * Registry to look up cluster nodes that have registered for a given rpc. - *

- * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this + * Registry to look up cluster nodes that have registered for a given RPC. + * + *

+ * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor} to maintain this * cluster wide information. */ -public class RpcRegistry extends BucketStore { +public class RpcRegistry extends BucketStoreActor { + private final ActorRef rpcRegistrar; + private RemoteRpcRegistryMXBeanImpl mxBean; - final LoggingAdapter log = Logging.getLogger(getContext().system(), this); - - public RpcRegistry() { - getLocalBucket().setData(new RoutingTable()); - } + public RpcRegistry(final RemoteOpsProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) { + super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker, ImmutableSet.of())); + this.rpcRegistrar = requireNonNull(rpcRegistrar); - @Override - protected void handleReceive(Object message) throws Exception { - //TODO: if sender is remote, reject message - - if (message instanceof SetLocalRouter) { - receiveSetLocalRouter((SetLocalRouter) message); - } else if (message instanceof AddOrUpdateRoutes) { - receiveAddRoutes((AddOrUpdateRoutes) message); - } else if (message instanceof RemoveRoutes) { - receiveRemoveRoutes((RemoveRoutes) message); - } else if (message instanceof Messages.FindRouters) { - receiveGetRouter((FindRouters) message); - } else { - super.handleReceive(message); - } } /** - * Register's rpc broker + * Create a new props instance for instantiating an RpcRegistry actor. * - * @param message contains {@link akka.actor.ActorRef} for rpc broker + * @param config Provider configuration + * @param rpcRegistrar Local RPC provider interface, used to register routers to remote nodes + * @param rpcInvoker Actor handling RPC invocation requests from remote nodes + * @return A new {@link Props} instance */ - private void receiveSetLocalRouter(SetLocalRouter message) { - getLocalBucket().getData().setRouter(message.getRouter()); + public static Props props(final RemoteOpsProviderConfig config, final ActorRef rpcInvoker, + final ActorRef rpcRegistrar) { + return Props.create(RpcRegistry.class, config, rpcInvoker, rpcRegistrar); } - /** - * @param msg - */ - private void receiveAddRoutes(AddOrUpdateRoutes msg) { - - log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers()); + @Override + public void preStart() { + super.preStart(); + mxBean = new RemoteRpcRegistryMXBeanImpl(new BucketStoreAccess(self(), getContext().dispatcher(), + getConfig().getAskDuration()), getConfig().getAskDuration()); + } - RoutingTable table = getLocalBucket().getData().copy(); - for(RpcRouter.RouteIdentifier routeId : msg.getRouteIdentifiers()) { - table.addRoute(routeId); + @Override + public void postStop() throws Exception { + if (mxBean != null) { + mxBean.unregister(); + mxBean = null; } - - updateLocalBucket(table); + super.postStop(); } - /** - * @param msg contains list of route ids to remove - */ - private void receiveRemoveRoutes(RemoveRoutes msg) { - - RoutingTable table = getLocalBucket().getData().copy(); - for (RpcRouter.RouteIdentifier routeId : msg.getRouteIdentifiers()) { - table.removeRoute(routeId); + @Override + protected void handleCommand(final Object message) throws Exception { + if (message instanceof AddOrUpdateRoutes addRoutes) { + receiveAddRoutes(addRoutes); + } else if (message instanceof RemoveRoutes removeRoutes) { + receiveRemoveRoutes(removeRoutes); + } else { + super.handleCommand(message); } + } - updateLocalBucket(table); + private void receiveAddRoutes(final AddOrUpdateRoutes msg) { + LOG.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers()); + updateLocalBucket(getLocalData().addRpcs(msg.getRouteIdentifiers())); } /** - * Finds routers for the given rpc. + * Processes a RemoveRoutes message. * - * @param msg + * @param msg contains list of route ids to remove */ - private void receiveGetRouter(FindRouters msg) { - List> routers = new ArrayList<>(); + private void receiveRemoveRoutes(final RemoveRoutes msg) { + LOG.debug("RemoveRoutes: {}", msg.getRouteIdentifiers()); + updateLocalBucket(getLocalData().removeRpcs(msg.getRouteIdentifiers())); + } - RouteIdentifier routeId = msg.getRouteIdentifier(); - findRoutes(getLocalBucket().getData(), routeId, routers); + @Override + protected void onBucketRemoved(final Address address, final Bucket bucket) { + rpcRegistrar.tell(new Messages.UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())), + ActorRef.noSender()); + } - for(Bucket bucket : getRemoteBuckets().values()) { - findRoutes(bucket.getData(), routeId, routers); - } + @Override + protected void onBucketsUpdated(final Map> buckets) { + final Map> endpoints = new HashMap<>(buckets.size()); - getSender().tell(new Messages.FindRoutersReply(routers), getSelf()); - } + for (Entry> e : buckets.entrySet()) { + final RoutingTable table = e.getValue().getData(); - private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier routeId, - List> routers) { - if (table == null) { - return; + final Collection rpcs = table.getItems(); + endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty() + : Optional.of(new RemoteRpcEndpoint(table.getInvoker(), rpcs))); } - Option> routerWithUpdateTime = table.getRouterFor(routeId); - if(!routerWithUpdateTime.isEmpty()) { - routers.add(routerWithUpdateTime.get()); + if (!endpoints.isEmpty()) { + rpcRegistrar.tell(new Messages.UpdateRemoteEndpoints(endpoints), ActorRef.noSender()); } } - /** - * All messages used by the RpcRegistry - */ - public static class Messages { - - - public static class ContainsRoute { - final List> routeIdentifiers; - - public ContainsRoute(List> routeIdentifiers) { - Preconditions.checkArgument(routeIdentifiers != null && - !routeIdentifiers.isEmpty(), - "Route Identifiers must be supplied"); - this.routeIdentifiers = routeIdentifiers; - } + public static final class RemoteRpcEndpoint { + private final Set rpcs; + private final ActorRef router; - public List> getRouteIdentifiers() { - return this.routeIdentifiers; - } - - @Override - public String toString() { - return "ContainsRoute{" + - "routeIdentifiers=" + routeIdentifiers + - '}'; - } + @VisibleForTesting + public RemoteRpcEndpoint(final ActorRef router, final Collection rpcs) { + this.router = requireNonNull(router); + this.rpcs = ImmutableSet.copyOf(rpcs); } - public static class AddOrUpdateRoutes extends ContainsRoute { - - public AddOrUpdateRoutes(List> routeIdentifiers) { - super(routeIdentifiers); - } + public ActorRef getRouter() { + return router; } - public static class RemoveRoutes extends ContainsRoute { - - public RemoveRoutes(List> routeIdentifiers) { - super(routeIdentifiers); - } + public Set getRpcs() { + return rpcs; } + } - public static class SetLocalRouter { - private final ActorRef router; + /** + * All messages used by the RpcRegistry. + */ + public static class Messages { + abstract static class AbstractRouteMessage { + final List rpcRouteIdentifiers; - public SetLocalRouter(ActorRef router) { - Preconditions.checkArgument(router != null, "Router must not be null"); - this.router = router; + AbstractRouteMessage(final Collection rpcRouteIdentifiers) { + checkArgument(rpcRouteIdentifiers != null && !rpcRouteIdentifiers.isEmpty(), + "Route Identifiers must be supplied"); + this.rpcRouteIdentifiers = ImmutableList.copyOf(rpcRouteIdentifiers); } - public ActorRef getRouter() { - return this.router; + List getRouteIdentifiers() { + return rpcRouteIdentifiers; } @Override public String toString() { - return "SetLocalRouter{" + - "router=" + router + - '}'; + return "ContainsRoute{" + "routeIdentifiers=" + rpcRouteIdentifiers + '}'; } } - public static class FindRouters { - private final RpcRouter.RouteIdentifier routeIdentifier; - - public FindRouters(RpcRouter.RouteIdentifier routeIdentifier) { - Preconditions.checkArgument(routeIdentifier != null, "Route must not be null"); - this.routeIdentifier = routeIdentifier; + public static final class AddOrUpdateRoutes extends Messages.AbstractRouteMessage { + public AddOrUpdateRoutes(final Collection rpcRouteIdentifiers) { + super(rpcRouteIdentifiers); } - public RpcRouter.RouteIdentifier getRouteIdentifier() { - return routeIdentifier; - } + } - @Override - public String toString() { - return "FindRouters{" + - "routeIdentifier=" + routeIdentifier + - '}'; + public static final class RemoveRoutes extends AbstractRouteMessage { + public RemoveRoutes(final Collection rpcRouteIdentifiers) { + super(rpcRouteIdentifiers); } } - public static class FindRoutersReply { - final List> routerWithUpdateTime; + public static final class UpdateRemoteEndpoints { + private final Map> rpcEndpoints; - public FindRoutersReply(List> routerWithUpdateTime) { - Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null"); - this.routerWithUpdateTime = routerWithUpdateTime; - } - public List> getRouterWithUpdateTime() { - return routerWithUpdateTime; + @VisibleForTesting + public UpdateRemoteEndpoints(final Map> rpcEndpoints) { + this.rpcEndpoints = ImmutableMap.copyOf(rpcEndpoints); } - @Override - public String toString() { - return "FindRoutersReply{" + - "routerWithUpdateTime=" + routerWithUpdateTime + - '}'; + public Map> getRpcEndpoints() { + return rpcEndpoints; } } }