X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2FRpcRegistry.java;h=219646d8478ade824d22589842c4d4ddf1edccaa;hp=7cb505aa98d3d8c81c70cae5bfd5292f4da5ffd2;hb=e4c11407593914ed4520253909d0d7669e51cfac;hpb=244d226cc66672e2e15d0b557bd1af37153d8065 diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java index 7cb505aa98..219646d847 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java @@ -7,196 +7,217 @@ */ package org.opendaylight.controller.remote.rpc.registry; -import akka.actor.ActorSelection; -import akka.actor.Address; -import akka.actor.Props; -import akka.cluster.ClusterEvent; -import akka.cluster.Member; -import akka.japi.Creator; -import org.opendaylight.controller.remote.rpc.AbstractUntypedActor; -import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.AddRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply; -import org.opendaylight.controller.remote.rpc.messages.GetRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRpcReply; -import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.RemoveRpc; -import org.opendaylight.controller.remote.rpc.messages.RoutingTableData; +import akka.actor.ActorRef; +import akka.japi.Option; +import akka.japi.Pair; +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.List; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter; +import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; +import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore; import org.opendaylight.controller.sal.connector.api.RpcRouter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.collection.JavaConversions; - -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier; /** - * This Actor maintains the routing table state and sync it with other nodes in the cluster. - * - * A scheduler runs after an interval of time, which pick a random member from the cluster - * and send the current state of routing table to the member. - * - * when a message of routing table data is received, it gets merged with the local routing table - * to keep the latest data. + * Registry to look up cluster nodes that have registered for a given rpc. + *

+ * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this + * cluster wide information. */ +public class RpcRegistry extends BucketStore { -public class RpcRegistry extends AbstractUntypedActor { - - private static final Logger LOG = LoggerFactory.getLogger(RpcRegistry.class); - private RoutingTable, String> routingTable; - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - private final ClusterWrapper clusterWrapper; - private final ScheduledFuture syncScheduler; - - private RpcRegistry(ClusterWrapper clusterWrapper){ - this.routingTable = new RoutingTable<>(); - this.clusterWrapper = clusterWrapper; - this.syncScheduler = scheduler.scheduleAtFixedRate(new SendRoutingTable(), 10, 10, TimeUnit.SECONDS); - } - - public static Props props(final ClusterWrapper clusterWrapper){ - return Props.create(new Creator(){ - - @Override - public RpcRegistry create() throws Exception { - return new RpcRegistry(clusterWrapper); - } - }); - } - - @Override - protected void handleReceive(Object message) throws Exception { - LOG.debug("Received message {}", message); - if(message instanceof RoutingTableData) { - syncRoutingTable((RoutingTableData) message); - } else if(message instanceof GetRoutedRpc) { - getRoutedRpc((GetRoutedRpc) message); - } else if(message instanceof GetRpc) { - getRpc((GetRpc) message); - } else if(message instanceof AddRpc) { - addRpc((AddRpc) message); - } else if(message instanceof RemoveRpc) { - removeRpc((RemoveRpc) message); - } else if(message instanceof AddRoutedRpc) { - addRoutedRpc((AddRoutedRpc) message); - } else if(message instanceof RemoveRoutedRpc) { - removeRoutedRpc((RemoveRoutedRpc) message); + public RpcRegistry() { + getLocalBucket().setData(new RoutingTable()); + } + + @Override + protected void handleReceive(Object message) throws Exception { + //TODO: if sender is remote, reject message + + if (message instanceof SetLocalRouter) { + receiveSetLocalRouter((SetLocalRouter) message); + } else if (message instanceof AddOrUpdateRoutes) { + receiveAddRoutes((AddOrUpdateRoutes) message); + } else if (message instanceof RemoveRoutes) { + receiveRemoveRoutes((RemoveRoutes) message); + } else if (message instanceof Messages.FindRouters) { + receiveGetRouter((FindRouters) message); + } else { + super.handleReceive(message); + } } - } - private void getRoutedRpc(GetRoutedRpc rpcMsg){ - LOG.debug("Get latest routed Rpc location from routing table {}", rpcMsg); - String remoteActorPath = routingTable.getLastAddedRoutedRpc(rpcMsg.getRouteId()); - GetRoutedRpcReply routedRpcReply = new GetRoutedRpcReply(remoteActorPath); + /** + * Register's rpc broker + * + * @param message contains {@link akka.actor.ActorRef} for rpc broker + */ + private void receiveSetLocalRouter(SetLocalRouter message) { + getLocalBucket().getData().setRouter(message.getRouter()); + } - getSender().tell(routedRpcReply, self()); - } + /** + * @param msg + */ + private void receiveAddRoutes(AddOrUpdateRoutes msg) { - private void getRpc(GetRpc rpcMsg) { - LOG.debug("Get global Rpc location from routing table {}", rpcMsg); - String remoteActorPath = routingTable.getGlobalRoute(rpcMsg.getRouteId()); - GetRpcReply rpcReply = new GetRpcReply(remoteActorPath); + log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers()); - getSender().tell(rpcReply, self()); - } + RoutingTable table = getLocalBucket().getData().copy(); + for(RpcRouter.RouteIdentifier routeId : msg.getRouteIdentifiers()) { + table.addRoute(routeId); + } - private void addRpc(AddRpc rpcMsg) { - LOG.debug("Add Rpc to routing table {}", rpcMsg); - routingTable.addGlobalRoute(rpcMsg.getRouteId(), rpcMsg.getActorPath()); + updateLocalBucket(table); + } - getSender().tell("Success", self()); - } + /** + * @param msg contains list of route ids to remove + */ + private void receiveRemoveRoutes(RemoveRoutes msg) { - private void removeRpc(RemoveRpc rpcMsg) { - LOG.debug("Removing Rpc to routing table {}", rpcMsg); - routingTable.removeGlobalRoute(rpcMsg.getRouteId()); + RoutingTable table = getLocalBucket().getData().copy(); + for (RpcRouter.RouteIdentifier routeId : msg.getRouteIdentifiers()) { + table.removeRoute(routeId); + } - getSender().tell("Success", self()); - } + updateLocalBucket(table); + } - private void addRoutedRpc(AddRoutedRpc rpcMsg) { - routingTable.addRoutedRpcs(rpcMsg.getAnnouncements(), rpcMsg.getActorPath()); - getSender().tell("Success", self()); - } + /** + * Finds routers for the given rpc. + * + * @param msg + */ + private void receiveGetRouter(FindRouters msg) { + List> routers = new ArrayList<>(); - private void removeRoutedRpc(RemoveRoutedRpc rpcMsg) { - routingTable.removeRoutes(rpcMsg.getAnnouncements(), rpcMsg.getActorPath()); - getSender().tell("Success", self()); - } + RouteIdentifier routeId = msg.getRouteIdentifier(); + findRoutes(getLocalBucket().getData(), routeId, routers); - private void syncRoutingTable(RoutingTableData routingTableData) { - LOG.debug("Syncing routing table {}", routingTableData); + for(Bucket bucket : getRemoteBuckets().values()) { + findRoutes(bucket.getData(), routeId, routers); + } - Map, String> newRpcMap = routingTableData.getRpcMap(); - Set> routeIds = newRpcMap.keySet(); - for(RpcRouter.RouteIdentifier routeId : routeIds) { - routingTable.addGlobalRoute(routeId, newRpcMap.get(routeId)); + getSender().tell(new Messages.FindRoutersReply(routers), getSelf()); } - Map, LinkedHashSet> newRoutedRpcMap = - routingTableData.getRoutedRpcMap(); - routeIds = newRoutedRpcMap.keySet(); + private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier routeId, + List> routers) { + if (table == null) { + return; + } - for(RpcRouter.RouteIdentifier routeId : routeIds) { - Set routeAddresses = newRoutedRpcMap.get(routeId); - for(String routeAddress : routeAddresses) { - routingTable.addRoutedRpc(routeId, routeAddress); - } + Option> routerWithUpdateTime = table.getRouterFor(routeId); + if(!routerWithUpdateTime.isEmpty()) { + routers.add(routerWithUpdateTime.get()); + } } - } - - private ActorSelection getRandomRegistryActor() { - ClusterEvent.CurrentClusterState clusterState = clusterWrapper.getState(); - ActorSelection actor = null; - Set members = JavaConversions.asJavaSet(clusterState.members()); - int memberSize = members.size(); - // Don't select yourself - if(memberSize > 1) { - Address currentNodeAddress = clusterWrapper.getAddress(); - int index = new Random().nextInt(memberSize); - int i = 0; - // keeping previous member, in case when random index member is same as current actor - // and current actor member is last in set - Member previousMember = null; - for(Member member : members){ - if(i == index-1) { - previousMember = member; + + /** + * All messages used by the RpcRegistry + */ + public static class Messages { + + + public static class ContainsRoute { + final List> routeIdentifiers; + + public ContainsRoute(List> routeIdentifiers) { + Preconditions.checkArgument(routeIdentifiers != null && + !routeIdentifiers.isEmpty(), + "Route Identifiers must be supplied"); + this.routeIdentifiers = routeIdentifiers; + } + + public List> getRouteIdentifiers() { + return this.routeIdentifiers; + } + + @Override + public String toString() { + return "ContainsRoute{" + + "routeIdentifiers=" + routeIdentifiers + + '}'; + } } - if(i == index) { - if(!currentNodeAddress.equals(member.address())) { - actor = this.context().actorSelection(member.address() + "/user/rpc-registry"); - break; - } else if(index < memberSize-1){ // pick the next element in the set - index++; - } + + public static class AddOrUpdateRoutes extends ContainsRoute { + + public AddOrUpdateRoutes(List> routeIdentifiers) { + super(routeIdentifiers); + } } - i++; - } - if(actor == null && previousMember != null) { - actor = this.context().actorSelection(previousMember.address() + "/user/rpc-registry"); - } - } - return actor; - } - private class SendRoutingTable implements Runnable { + public static class RemoveRoutes extends ContainsRoute { - @Override - public void run() { - RoutingTableData routingTableData = - new RoutingTableData(routingTable.getGlobalRpcMap(), routingTable.getRoutedRpcMap()); - LOG.debug("Sending routing table for sync {}", routingTableData); - ActorSelection actor = getRandomRegistryActor(); - if(actor != null) { - actor.tell(routingTableData, self()); - } + public RemoveRoutes(List> routeIdentifiers) { + super(routeIdentifiers); + } + } + + public static class SetLocalRouter { + private final ActorRef router; + + public SetLocalRouter(ActorRef router) { + Preconditions.checkArgument(router != null, "Router must not be null"); + this.router = router; + } + + public ActorRef getRouter() { + return this.router; + } + + @Override + public String toString() { + return "SetLocalRouter{" + + "router=" + router + + '}'; + } + } + + public static class FindRouters { + private final RpcRouter.RouteIdentifier routeIdentifier; + + public FindRouters(RpcRouter.RouteIdentifier routeIdentifier) { + Preconditions.checkArgument(routeIdentifier != null, "Route must not be null"); + this.routeIdentifier = routeIdentifier; + } + + public RpcRouter.RouteIdentifier getRouteIdentifier() { + return routeIdentifier; + } + + @Override + public String toString() { + return "FindRouters{" + + "routeIdentifier=" + routeIdentifier + + '}'; + } + } + + public static class FindRoutersReply { + final List> routerWithUpdateTime; + + public FindRoutersReply(List> routerWithUpdateTime) { + Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null"); + this.routerWithUpdateTime = routerWithUpdateTime; + } + + public List> getRouterWithUpdateTime() { + return routerWithUpdateTime; + } + + @Override + public String toString() { + return "FindRoutersReply{" + + "routerWithUpdateTime=" + routerWithUpdateTime + + '}'; + } + } } - } }