X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2FRpcRegistry.java;h=e36060cc13ece309f04f10adebdb74a62f158146;hb=91cbd66f53b89d89e419bf23e6969da1f8df137e;hp=7cb505aa98d3d8c81c70cae5bfd5292f4da5ffd2;hpb=fe4049d34de103016d11f3a9050853c6380646d3;p=controller.git diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java index 7cb505aa98..5ba97a306f 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java @@ -7,196 +7,181 @@ */ package org.opendaylight.controller.remote.rpc.registry; -import akka.actor.ActorSelection; +import akka.actor.ActorRef; import akka.actor.Address; import akka.actor.Props; -import akka.cluster.ClusterEvent; -import akka.cluster.Member; -import akka.japi.Creator; -import org.opendaylight.controller.remote.rpc.AbstractUntypedActor; -import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.AddRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply; -import org.opendaylight.controller.remote.rpc.messages.GetRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRpcReply; -import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.RemoveRpc; -import org.opendaylight.controller.remote.rpc.messages.RoutingTableData; -import org.opendaylight.controller.sal.connector.api.RpcRouter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.collection.JavaConversions; - -import java.util.LinkedHashSet; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.Random; +import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints; +import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; +import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess; +import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor; +import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl; +import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier; /** - * This Actor maintains the routing table state and sync it with other nodes in the cluster. + * Registry to look up cluster nodes that have registered for a given RPC. * - * A scheduler runs after an interval of time, which pick a random member from the cluster - * and send the current state of routing table to the member. - * - * when a message of routing table data is received, it gets merged with the local routing table - * to keep the latest data. + *

+ * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor} to maintain this + * cluster wide information. */ +public class RpcRegistry extends BucketStoreActor { + private final ActorRef rpcRegistrar; + private final RemoteRpcRegistryMXBeanImpl mxBean; + + public RpcRegistry(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) { + super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker, ImmutableSet.of())); + this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar); + this.mxBean = new RemoteRpcRegistryMXBeanImpl(new BucketStoreAccess(self(), getContext().dispatcher(), + config.getAskDuration()), config.getAskDuration()); + } -public class RpcRegistry extends AbstractUntypedActor { - - private static final Logger LOG = LoggerFactory.getLogger(RpcRegistry.class); - private RoutingTable, String> routingTable; - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - private final ClusterWrapper clusterWrapper; - private final ScheduledFuture syncScheduler; - - private RpcRegistry(ClusterWrapper clusterWrapper){ - this.routingTable = new RoutingTable<>(); - this.clusterWrapper = clusterWrapper; - this.syncScheduler = scheduler.scheduleAtFixedRate(new SendRoutingTable(), 10, 10, TimeUnit.SECONDS); - } - - public static Props props(final ClusterWrapper clusterWrapper){ - return Props.create(new Creator(){ - - @Override - public RpcRegistry create() throws Exception { - return new RpcRegistry(clusterWrapper); - } - }); - } - - @Override - protected void handleReceive(Object message) throws Exception { - LOG.debug("Received message {}", message); - if(message instanceof RoutingTableData) { - syncRoutingTable((RoutingTableData) message); - } else if(message instanceof GetRoutedRpc) { - getRoutedRpc((GetRoutedRpc) message); - } else if(message instanceof GetRpc) { - getRpc((GetRpc) message); - } else if(message instanceof AddRpc) { - addRpc((AddRpc) message); - } else if(message instanceof RemoveRpc) { - removeRpc((RemoveRpc) message); - } else if(message instanceof AddRoutedRpc) { - addRoutedRpc((AddRoutedRpc) message); - } else if(message instanceof RemoveRoutedRpc) { - removeRoutedRpc((RemoveRoutedRpc) message); + /** + * Create a new props instance for instantiating an RpcRegistry actor. + * + * @param config Provider configuration + * @param rpcRegistrar Local RPC provider interface, used to register routers to remote nodes + * @param rpcInvoker Actor handling RPC invocation requests from remote nodes + * @return A new {@link Props} instance + */ + public static Props props(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, + final ActorRef rpcRegistrar) { + return Props.create(RpcRegistry.class, config, rpcInvoker, rpcRegistrar); } - } - private void getRoutedRpc(GetRoutedRpc rpcMsg){ - LOG.debug("Get latest routed Rpc location from routing table {}", rpcMsg); - String remoteActorPath = routingTable.getLastAddedRoutedRpc(rpcMsg.getRouteId()); - GetRoutedRpcReply routedRpcReply = new GetRoutedRpcReply(remoteActorPath); + @Override + public void postStop() { + super.postStop(); + this.mxBean.unregister(); + } - getSender().tell(routedRpcReply, self()); - } + @Override + protected void handleCommand(final Object message) throws Exception { + if (message instanceof AddOrUpdateRoutes) { + receiveAddRoutes((AddOrUpdateRoutes) message); + } else if (message instanceof RemoveRoutes) { + receiveRemoveRoutes((RemoveRoutes) message); + } else { + super.handleCommand(message); + } + } - private void getRpc(GetRpc rpcMsg) { - LOG.debug("Get global Rpc location from routing table {}", rpcMsg); - String remoteActorPath = routingTable.getGlobalRoute(rpcMsg.getRouteId()); - GetRpcReply rpcReply = new GetRpcReply(remoteActorPath); + private void receiveAddRoutes(final AddOrUpdateRoutes msg) { + LOG.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers()); + updateLocalBucket(getLocalData().addRpcs(msg.getRouteIdentifiers())); + } + + /** + * Processes a RemoveRoutes message. + * + * @param msg contains list of route ids to remove + */ + private void receiveRemoveRoutes(final RemoveRoutes msg) { + LOG.debug("RemoveRoutes: {}", msg.getRouteIdentifiers()); + updateLocalBucket(getLocalData().removeRpcs(msg.getRouteIdentifiers())); + } - getSender().tell(rpcReply, self()); - } + @Override + protected void onBucketRemoved(final Address address, final Bucket bucket) { + rpcRegistrar.tell(new UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())), ActorRef.noSender()); + } - private void addRpc(AddRpc rpcMsg) { - LOG.debug("Add Rpc to routing table {}", rpcMsg); - routingTable.addGlobalRoute(rpcMsg.getRouteId(), rpcMsg.getActorPath()); + @Override + protected void onBucketsUpdated(final Map> buckets) { + final Map> endpoints = new HashMap<>(buckets.size()); - getSender().tell("Success", self()); - } + for (Entry> e : buckets.entrySet()) { + final RoutingTable table = e.getValue().getData(); - private void removeRpc(RemoveRpc rpcMsg) { - LOG.debug("Removing Rpc to routing table {}", rpcMsg); - routingTable.removeGlobalRoute(rpcMsg.getRouteId()); + final Collection rpcs = table.getRoutes(); + endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty() + : Optional.of(new RemoteRpcEndpoint(table.getRpcInvoker(), rpcs))); + } - getSender().tell("Success", self()); - } + if (!endpoints.isEmpty()) { + rpcRegistrar.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender()); + } + } - private void addRoutedRpc(AddRoutedRpc rpcMsg) { - routingTable.addRoutedRpcs(rpcMsg.getAnnouncements(), rpcMsg.getActorPath()); - getSender().tell("Success", self()); - } + public static final class RemoteRpcEndpoint { + private final Set rpcs; + private final ActorRef router; - private void removeRoutedRpc(RemoveRoutedRpc rpcMsg) { - routingTable.removeRoutes(rpcMsg.getAnnouncements(), rpcMsg.getActorPath()); - getSender().tell("Success", self()); - } + @VisibleForTesting + public RemoteRpcEndpoint(final ActorRef router, final Collection rpcs) { + this.router = Preconditions.checkNotNull(router); + this.rpcs = ImmutableSet.copyOf(rpcs); + } - private void syncRoutingTable(RoutingTableData routingTableData) { - LOG.debug("Syncing routing table {}", routingTableData); + public ActorRef getRouter() { + return router; + } - Map, String> newRpcMap = routingTableData.getRpcMap(); - Set> routeIds = newRpcMap.keySet(); - for(RpcRouter.RouteIdentifier routeId : routeIds) { - routingTable.addGlobalRoute(routeId, newRpcMap.get(routeId)); + public Set getRpcs() { + return rpcs; + } } - Map, LinkedHashSet> newRoutedRpcMap = - routingTableData.getRoutedRpcMap(); - routeIds = newRoutedRpcMap.keySet(); + /** + * All messages used by the RpcRegistry. + */ + public static class Messages { + abstract static class AbstractRouteMessage { + final List routeIdentifiers; + + AbstractRouteMessage(final Collection routeIdentifiers) { + Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(), + "Route Identifiers must be supplied"); + this.routeIdentifiers = ImmutableList.copyOf(routeIdentifiers); + } + + List getRouteIdentifiers() { + return this.routeIdentifiers; + } + + @Override + public String toString() { + return "ContainsRoute{" + "routeIdentifiers=" + routeIdentifiers + '}'; + } + } - for(RpcRouter.RouteIdentifier routeId : routeIds) { - Set routeAddresses = newRoutedRpcMap.get(routeId); - for(String routeAddress : routeAddresses) { - routingTable.addRoutedRpc(routeId, routeAddress); - } - } - } - - private ActorSelection getRandomRegistryActor() { - ClusterEvent.CurrentClusterState clusterState = clusterWrapper.getState(); - ActorSelection actor = null; - Set members = JavaConversions.asJavaSet(clusterState.members()); - int memberSize = members.size(); - // Don't select yourself - if(memberSize > 1) { - Address currentNodeAddress = clusterWrapper.getAddress(); - int index = new Random().nextInt(memberSize); - int i = 0; - // keeping previous member, in case when random index member is same as current actor - // and current actor member is last in set - Member previousMember = null; - for(Member member : members){ - if(i == index-1) { - previousMember = member; + public static final class AddOrUpdateRoutes extends AbstractRouteMessage { + public AddOrUpdateRoutes(final Collection routeIdentifiers) { + super(routeIdentifiers); + } } - if(i == index) { - if(!currentNodeAddress.equals(member.address())) { - actor = this.context().actorSelection(member.address() + "/user/rpc-registry"); - break; - } else if(index < memberSize-1){ // pick the next element in the set - index++; - } + + public static final class RemoveRoutes extends AbstractRouteMessage { + public RemoveRoutes(final Collection routeIdentifiers) { + super(routeIdentifiers); + } } - i++; - } - if(actor == null && previousMember != null) { - actor = this.context().actorSelection(previousMember.address() + "/user/rpc-registry"); - } - } - return actor; - } - private class SendRoutingTable implements Runnable { + public static final class UpdateRemoteEndpoints { + private final Map> endpoints; - @Override - public void run() { - RoutingTableData routingTableData = - new RoutingTableData(routingTable.getGlobalRpcMap(), routingTable.getRoutedRpcMap()); - LOG.debug("Sending routing table for sync {}", routingTableData); - ActorSelection actor = getRandomRegistryActor(); - if(actor != null) { - actor.tell(routingTableData, self()); - } + @VisibleForTesting + public UpdateRemoteEndpoints(final Map> endpoints) { + this.endpoints = ImmutableMap.copyOf(endpoints); + } + + public Map> getEndpoints() { + return endpoints; + } + } } - } }