X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2FRpcRegistry.java;h=fc5b618663988d304d6b51468d2da9f99250d7b2;hp=7cb505aa98d3d8c81c70cae5bfd5292f4da5ffd2;hb=b78ee4d6b08e2cc0cf5edd01af0e54c3bf619ab5;hpb=64781d7e080c4278e05a113a7d5f508b25605138 diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java index 7cb505aa98..fc5b618663 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java @@ -7,196 +7,280 @@ */ package org.opendaylight.controller.remote.rpc.registry; -import akka.actor.ActorSelection; -import akka.actor.Address; +import akka.actor.ActorRef; +import akka.actor.Cancellable; import akka.actor.Props; -import akka.cluster.ClusterEvent; -import akka.cluster.Member; -import akka.japi.Creator; -import org.opendaylight.controller.remote.rpc.AbstractUntypedActor; -import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.AddRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply; -import org.opendaylight.controller.remote.rpc.messages.GetRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRpcReply; -import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.RemoveRpc; -import org.opendaylight.controller.remote.rpc.messages.RoutingTableData; -import org.opendaylight.controller.sal.connector.api.RpcRouter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.collection.JavaConversions; - -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Random; +import akka.japi.Option; +import akka.japi.Pair; +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter; +import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; +import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore; +import org.opendaylight.controller.sal.connector.api.RpcRouter; +import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier; +import scala.concurrent.duration.FiniteDuration; /** - * This Actor maintains the routing table state and sync it with other nodes in the cluster. - * - * A scheduler runs after an interval of time, which pick a random member from the cluster - * and send the current state of routing table to the member. + * Registry to look up cluster nodes that have registered for a given rpc. * - * when a message of routing table data is received, it gets merged with the local routing table - * to keep the latest data. + *

+ * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this + * cluster wide information. */ +public class RpcRegistry extends BucketStore { + private final Set routesUpdatedCallbacks = new HashSet<>(); + private final FiniteDuration findRouterTimeout; -public class RpcRegistry extends AbstractUntypedActor { - - private static final Logger LOG = LoggerFactory.getLogger(RpcRegistry.class); - private RoutingTable, String> routingTable; - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - private final ClusterWrapper clusterWrapper; - private final ScheduledFuture syncScheduler; - - private RpcRegistry(ClusterWrapper clusterWrapper){ - this.routingTable = new RoutingTable<>(); - this.clusterWrapper = clusterWrapper; - this.syncScheduler = scheduler.scheduleAtFixedRate(new SendRoutingTable(), 10, 10, TimeUnit.SECONDS); - } - - public static Props props(final ClusterWrapper clusterWrapper){ - return Props.create(new Creator(){ - - @Override - public RpcRegistry create() throws Exception { - return new RpcRegistry(clusterWrapper); - } - }); - } - - @Override - protected void handleReceive(Object message) throws Exception { - LOG.debug("Received message {}", message); - if(message instanceof RoutingTableData) { - syncRoutingTable((RoutingTableData) message); - } else if(message instanceof GetRoutedRpc) { - getRoutedRpc((GetRoutedRpc) message); - } else if(message instanceof GetRpc) { - getRpc((GetRpc) message); - } else if(message instanceof AddRpc) { - addRpc((AddRpc) message); - } else if(message instanceof RemoveRpc) { - removeRpc((RemoveRpc) message); - } else if(message instanceof AddRoutedRpc) { - addRoutedRpc((AddRoutedRpc) message); - } else if(message instanceof RemoveRoutedRpc) { - removeRoutedRpc((RemoveRoutedRpc) message); + public RpcRegistry(RemoteRpcProviderConfig config) { + super(config); + getLocalBucket().setData(new RoutingTable()); + findRouterTimeout = getConfig().getGossipTickInterval().$times(10); } - } - private void getRoutedRpc(GetRoutedRpc rpcMsg){ - LOG.debug("Get latest routed Rpc location from routing table {}", rpcMsg); - String remoteActorPath = routingTable.getLastAddedRoutedRpc(rpcMsg.getRouteId()); - GetRoutedRpcReply routedRpcReply = new GetRoutedRpcReply(remoteActorPath); + public static Props props(RemoteRpcProviderConfig config) { + return Props.create(RpcRegistry.class, config); + } - getSender().tell(routedRpcReply, self()); - } + @Override + protected void handleReceive(Object message) throws Exception { + //TODO: if sender is remote, reject message - private void getRpc(GetRpc rpcMsg) { - LOG.debug("Get global Rpc location from routing table {}", rpcMsg); - String remoteActorPath = routingTable.getGlobalRoute(rpcMsg.getRouteId()); - GetRpcReply rpcReply = new GetRpcReply(remoteActorPath); + if (message instanceof SetLocalRouter) { + receiveSetLocalRouter((SetLocalRouter) message); + } else if (message instanceof AddOrUpdateRoutes) { + receiveAddRoutes((AddOrUpdateRoutes) message); + } else if (message instanceof RemoveRoutes) { + receiveRemoveRoutes((RemoveRoutes) message); + } else if (message instanceof Messages.FindRouters) { + receiveGetRouter((FindRouters) message); + } else if (message instanceof Runnable) { + ((Runnable)message).run(); + } else { + super.handleReceive(message); + } + } - getSender().tell(rpcReply, self()); - } + /** + * Registers a rpc broker. + * + * @param message contains {@link akka.actor.ActorRef} for rpc broker + */ + private void receiveSetLocalRouter(SetLocalRouter message) { + getLocalBucket().getData().setRouter(message.getRouter()); + } - private void addRpc(AddRpc rpcMsg) { - LOG.debug("Add Rpc to routing table {}", rpcMsg); - routingTable.addGlobalRoute(rpcMsg.getRouteId(), rpcMsg.getActorPath()); + private void receiveAddRoutes(AddOrUpdateRoutes msg) { - getSender().tell("Success", self()); - } + log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers()); - private void removeRpc(RemoveRpc rpcMsg) { - LOG.debug("Removing Rpc to routing table {}", rpcMsg); - routingTable.removeGlobalRoute(rpcMsg.getRouteId()); + RoutingTable table = getLocalBucket().getData().copy(); + for (RpcRouter.RouteIdentifier routeId : msg.getRouteIdentifiers()) { + table.addRoute(routeId); + } - getSender().tell("Success", self()); - } + updateLocalBucket(table); - private void addRoutedRpc(AddRoutedRpc rpcMsg) { - routingTable.addRoutedRpcs(rpcMsg.getAnnouncements(), rpcMsg.getActorPath()); - getSender().tell("Success", self()); - } + onBucketsUpdated(); + } - private void removeRoutedRpc(RemoveRoutedRpc rpcMsg) { - routingTable.removeRoutes(rpcMsg.getAnnouncements(), rpcMsg.getActorPath()); - getSender().tell("Success", self()); - } + /** + * Processes a RemoveRoutes message. + * + * @param msg contains list of route ids to remove + */ + private void receiveRemoveRoutes(RemoveRoutes msg) { - private void syncRoutingTable(RoutingTableData routingTableData) { - LOG.debug("Syncing routing table {}", routingTableData); + RoutingTable table = getLocalBucket().getData().copy(); + for (RpcRouter.RouteIdentifier routeId : msg.getRouteIdentifiers()) { + table.removeRoute(routeId); + } - Map, String> newRpcMap = routingTableData.getRpcMap(); - Set> routeIds = newRpcMap.keySet(); - for(RpcRouter.RouteIdentifier routeId : routeIds) { - routingTable.addGlobalRoute(routeId, newRpcMap.get(routeId)); + updateLocalBucket(table); } - Map, LinkedHashSet> newRoutedRpcMap = - routingTableData.getRoutedRpcMap(); - routeIds = newRoutedRpcMap.keySet(); + /** + * Finds routers for the given rpc. + * + * @param findRouters the FindRouters request + */ + private void receiveGetRouter(final FindRouters findRouters) { + log.debug("receiveGetRouter for {}", findRouters.getRouteIdentifier()); + + final ActorRef sender = getSender(); + if (!findRouters(findRouters, sender)) { + log.debug("No routers found for {} - scheduling {} ms timer", findRouters.getRouteIdentifier(), + findRouterTimeout.toMillis()); + + final AtomicReference timer = new AtomicReference<>(); + final Runnable routesUpdatedRunnable = new Runnable() { + @Override + public void run() { + if (findRouters(findRouters, sender)) { + routesUpdatedCallbacks.remove(this); + timer.get().cancel(); + } + } + }; + + routesUpdatedCallbacks.add(routesUpdatedRunnable); + + Runnable timerRunnable = () -> { + log.warn("Timed out finding routers for {}", findRouters.getRouteIdentifier()); - for(RpcRouter.RouteIdentifier routeId : routeIds) { - Set routeAddresses = newRoutedRpcMap.get(routeId); - for(String routeAddress : routeAddresses) { - routingTable.addRoutedRpc(routeId, routeAddress); - } + routesUpdatedCallbacks.remove(routesUpdatedRunnable); + sender.tell(new Messages.FindRoutersReply( + Collections.>emptyList()), self()); + }; + + timer.set(getContext().system().scheduler().scheduleOnce(findRouterTimeout, self(), timerRunnable, + getContext().dispatcher(), self())); + } } - } - - private ActorSelection getRandomRegistryActor() { - ClusterEvent.CurrentClusterState clusterState = clusterWrapper.getState(); - ActorSelection actor = null; - Set members = JavaConversions.asJavaSet(clusterState.members()); - int memberSize = members.size(); - // Don't select yourself - if(memberSize > 1) { - Address currentNodeAddress = clusterWrapper.getAddress(); - int index = new Random().nextInt(memberSize); - int i = 0; - // keeping previous member, in case when random index member is same as current actor - // and current actor member is last in set - Member previousMember = null; - for(Member member : members){ - if(i == index-1) { - previousMember = member; - } - if(i == index) { - if(!currentNodeAddress.equals(member.address())) { - actor = this.context().actorSelection(member.address() + "/user/rpc-registry"); - break; - } else if(index < memberSize-1){ // pick the next element in the set - index++; - } - } - i++; - } - if(actor == null && previousMember != null) { - actor = this.context().actorSelection(previousMember.address() + "/user/rpc-registry"); - } + + private boolean findRouters(FindRouters findRouters, ActorRef sender) { + List> routers = new ArrayList<>(); + + RouteIdentifier routeId = findRouters.getRouteIdentifier(); + findRoutes(getLocalBucket().getData(), routeId, routers); + + for (Bucket bucket : getRemoteBuckets().values()) { + findRoutes(bucket.getData(), routeId, routers); + } + + log.debug("Found {} routers for {}", routers.size(), findRouters.getRouteIdentifier()); + + boolean foundRouters = !routers.isEmpty(); + if (foundRouters) { + sender.tell(new Messages.FindRoutersReply(routers), getSelf()); + } + + return foundRouters; } - return actor; - } - private class SendRoutingTable implements Runnable { + private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier routeId, + List> routers) { + if (table == null) { + return; + } + + Option> routerWithUpdateTime = table.getRouterFor(routeId); + if (!routerWithUpdateTime.isEmpty()) { + routers.add(routerWithUpdateTime.get()); + } + } @Override - public void run() { - RoutingTableData routingTableData = - new RoutingTableData(routingTable.getGlobalRpcMap(), routingTable.getRoutedRpcMap()); - LOG.debug("Sending routing table for sync {}", routingTableData); - ActorSelection actor = getRandomRegistryActor(); - if(actor != null) { - actor.tell(routingTableData, self()); - } + protected void onBucketsUpdated() { + if (routesUpdatedCallbacks.isEmpty()) { + return; + } + + for (Runnable callBack: routesUpdatedCallbacks.toArray(new Runnable[routesUpdatedCallbacks.size()])) { + callBack.run(); + } + } + + /** + * All messages used by the RpcRegistry. + */ + public static class Messages { + + + public static class ContainsRoute { + final List> routeIdentifiers; + + public ContainsRoute(List> routeIdentifiers) { + Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(), + "Route Identifiers must be supplied"); + this.routeIdentifiers = routeIdentifiers; + } + + public List> getRouteIdentifiers() { + return this.routeIdentifiers; + } + + @Override + public String toString() { + return "ContainsRoute{" + "routeIdentifiers=" + routeIdentifiers + '}'; + } + } + + public static class AddOrUpdateRoutes extends ContainsRoute { + + public AddOrUpdateRoutes(List> routeIdentifiers) { + super(routeIdentifiers); + } + } + + public static class RemoveRoutes extends ContainsRoute { + + public RemoveRoutes(List> routeIdentifiers) { + super(routeIdentifiers); + } + } + + public static class SetLocalRouter { + private final ActorRef router; + + public SetLocalRouter(ActorRef router) { + Preconditions.checkArgument(router != null, "Router must not be null"); + this.router = router; + } + + public ActorRef getRouter() { + return this.router; + } + + @Override + public String toString() { + return "SetLocalRouter{" + "router=" + router + '}'; + } + } + + public static class FindRouters { + private final RpcRouter.RouteIdentifier routeIdentifier; + + public FindRouters(RpcRouter.RouteIdentifier routeIdentifier) { + Preconditions.checkArgument(routeIdentifier != null, "Route must not be null"); + this.routeIdentifier = routeIdentifier; + } + + public RpcRouter.RouteIdentifier getRouteIdentifier() { + return routeIdentifier; + } + + @Override + public String toString() { + return "FindRouters{" + "routeIdentifier=" + routeIdentifier + '}'; + } + } + + public static class FindRoutersReply { + final List> routerWithUpdateTime; + + public FindRoutersReply(List> routerWithUpdateTime) { + Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null"); + this.routerWithUpdateTime = routerWithUpdateTime; + } + + public List> getRouterWithUpdateTime() { + return routerWithUpdateTime; + } + + @Override + public String toString() { + return "FindRoutersReply{" + "routerWithUpdateTime=" + routerWithUpdateTime + '}'; + } + } } - } }