X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2FRpcRegistry.java;h=1545eb00d2cc9af4bda80881d197bad132f0e667;hb=8426e7a67b1235e8ecc67b1a98a5bd096c88e729;hp=e36060cc13ece309f04f10adebdb74a62f158146;hpb=5a9287bb6ddaaa8805939b3b3301f648c03785f4;p=controller.git diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java index e36060cc13..1545eb00d2 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java @@ -7,197 +7,190 @@ */ package org.opendaylight.controller.remote.rpc.registry; -import akka.actor.ActorSelection; +import akka.actor.ActorRef; import akka.actor.Address; import akka.actor.Props; -import akka.cluster.ClusterEvent; -import akka.cluster.Member; -import akka.japi.Creator; -import org.opendaylight.controller.remote.rpc.AbstractUntypedActor; -import org.opendaylight.controller.remote.rpc.ActorConstants; -import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.AddRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply; -import org.opendaylight.controller.remote.rpc.messages.GetRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRpcReply; -import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.RemoveRpc; -import org.opendaylight.controller.remote.rpc.messages.RoutingTableData; -import org.opendaylight.controller.sal.connector.api.RpcRouter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.collection.JavaConversions; - -import java.util.LinkedHashSet; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.Random; +import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier; +import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; +import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints; +import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; +import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore; +import org.opendaylight.controller.sal.connector.api.RpcRouter; +import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; /** - * This Actor maintains the routing table state and sync it with other nodes in the cluster. - * - * A scheduler runs after an interval of time, which pick a random member from the cluster - * and send the current state of routing table to the member. + * Registry to look up cluster nodes that have registered for a given RPC. * - * when a message of routing table data is received, it gets merged with the local routing table - * to keep the latest data. + *

+ * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this + * cluster wide information. */ +public class RpcRegistry extends BucketStore { + private final ActorRef rpcRegistrar; -public class RpcRegistry extends AbstractUntypedActor { - - private static final Logger LOG = LoggerFactory.getLogger(RpcRegistry.class); - private RoutingTable, String> routingTable; - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - private final ClusterWrapper clusterWrapper; - private final ScheduledFuture syncScheduler; - - private RpcRegistry(ClusterWrapper clusterWrapper){ - this.routingTable = new RoutingTable<>(); - this.clusterWrapper = clusterWrapper; - this.syncScheduler = scheduler.scheduleAtFixedRate(new SendRoutingTable(), 10, 10, TimeUnit.SECONDS); - } - - public static Props props(final ClusterWrapper clusterWrapper){ - return Props.create(new Creator(){ - - @Override - public RpcRegistry create() throws Exception { - return new RpcRegistry(clusterWrapper); - } - }); - } - - @Override - protected void handleReceive(Object message) throws Exception { - LOG.debug("Received message {}", message); - if(message instanceof RoutingTableData) { - syncRoutingTable((RoutingTableData) message); - } else if(message instanceof GetRoutedRpc) { - getRoutedRpc((GetRoutedRpc) message); - } else if(message instanceof GetRpc) { - getRpc((GetRpc) message); - } else if(message instanceof AddRpc) { - addRpc((AddRpc) message); - } else if(message instanceof RemoveRpc) { - removeRpc((RemoveRpc) message); - } else if(message instanceof AddRoutedRpc) { - addRoutedRpc((AddRoutedRpc) message); - } else if(message instanceof RemoveRoutedRpc) { - removeRoutedRpc((RemoveRoutedRpc) message); + public RpcRegistry(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) { + super(config, new RoutingTable(rpcInvoker)); + this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar); } - } - private void getRoutedRpc(GetRoutedRpc rpcMsg){ - LOG.debug("Get latest routed Rpc location from routing table {}", rpcMsg); - String remoteActorPath = routingTable.getLastAddedRoutedRpc(rpcMsg.getRouteId()); - GetRoutedRpcReply routedRpcReply = new GetRoutedRpcReply(remoteActorPath); + /** + * Create a new props instance for instantiating an RpcRegistry actor. + * + * @param config Provider configuration + * @param rpcRegistrar Local RPC provider interface, used to register routers to remote nodes + * @param rpcInvoker Actor handling RPC invocation requests from remote nodes + * @return A new {@link Props} instance + */ + public static Props props(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, + final ActorRef rpcRegistrar) { + return Props.create(RpcRegistry.class, config, rpcInvoker, rpcRegistrar); + } - getSender().tell(routedRpcReply, self()); - } + @Override + protected void handleReceive(final Object message) throws Exception { + if (message instanceof AddOrUpdateRoutes) { + receiveAddRoutes((AddOrUpdateRoutes) message); + } else if (message instanceof RemoveRoutes) { + receiveRemoveRoutes((RemoveRoutes) message); + } else { + super.handleReceive(message); + } + } - private void getRpc(GetRpc rpcMsg) { - LOG.debug("Get global Rpc location from routing table {}", rpcMsg); - String remoteActorPath = routingTable.getGlobalRoute(rpcMsg.getRouteId()); - GetRpcReply rpcReply = new GetRpcReply(remoteActorPath); + private void receiveAddRoutes(final AddOrUpdateRoutes msg) { + LOG.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers()); - getSender().tell(rpcReply, self()); - } + RoutingTable table = getLocalBucket().getData().copy(); + for (RpcRouter.RouteIdentifier routeId : msg.getRouteIdentifiers()) { + table.addRoute(routeId); + } + + updateLocalBucket(table); + } - private void addRpc(AddRpc rpcMsg) { - LOG.debug("Add Rpc to routing table {}", rpcMsg); - routingTable.addGlobalRoute(rpcMsg.getRouteId(), rpcMsg.getActorPath()); + /** + * Processes a RemoveRoutes message. + * + * @param msg contains list of route ids to remove + */ + private void receiveRemoveRoutes(final RemoveRoutes msg) { + RoutingTable table = getLocalBucket().getData().copy(); + for (RpcRouter.RouteIdentifier routeId : msg.getRouteIdentifiers()) { + table.removeRoute(routeId); + } - getSender().tell("Success", self()); - } + updateLocalBucket(table); + } + + @Override + protected void onBucketRemoved(final Address address, final Bucket bucket) { + rpcRegistrar.tell(new UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())), ActorRef.noSender()); + } - private void removeRpc(RemoveRpc rpcMsg) { - LOG.debug("Removing Rpc to routing table {}", rpcMsg); - routingTable.removeGlobalRoute(rpcMsg.getRouteId()); + @Override + protected void onBucketsUpdated(final Map> buckets) { + final Map> endpoints = new HashMap<>(buckets.size()); + + for (Entry> e : buckets.entrySet()) { + final RoutingTable table = e.getValue().getData(); + + final List rpcs = new ArrayList<>(table.getRoutes().size()); + for (RouteIdentifier ri : table.getRoutes()) { + if (ri instanceof RouteIdentifierImpl) { + final RouteIdentifierImpl id = (RouteIdentifierImpl) ri; + rpcs.add(DOMRpcIdentifier.create(SchemaPath.create(true, id.getType()), id.getRoute())); + } else { + LOG.warn("Skipping unsupported route {} from {}", ri, e.getKey()); + } + } + + endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty() + : Optional.of(new RemoteRpcEndpoint(table.getRouter(), rpcs))); + } - getSender().tell("Success", self()); - } + if (!endpoints.isEmpty()) { + rpcRegistrar.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender()); + } + } - private void addRoutedRpc(AddRoutedRpc rpcMsg) { - routingTable.addRoutedRpcs(rpcMsg.getAnnouncements(), rpcMsg.getActorPath()); - getSender().tell("Success", self()); - } + public static final class RemoteRpcEndpoint { + private final Set rpcs; + private final ActorRef router; - private void removeRoutedRpc(RemoveRoutedRpc rpcMsg) { - routingTable.removeRoutes(rpcMsg.getAnnouncements(), rpcMsg.getActorPath()); - getSender().tell("Success", self()); - } + RemoteRpcEndpoint(final ActorRef router, final Collection rpcs) { + this.router = Preconditions.checkNotNull(router); + this.rpcs = ImmutableSet.copyOf(rpcs); + } - private void syncRoutingTable(RoutingTableData routingTableData) { - LOG.debug("Syncing routing table {}", routingTableData); + public ActorRef getRouter() { + return router; + } - Map, String> newRpcMap = routingTableData.getRpcMap(); - Set> routeIds = newRpcMap.keySet(); - for(RpcRouter.RouteIdentifier routeId : routeIds) { - routingTable.addGlobalRoute(routeId, newRpcMap.get(routeId)); + public Set getRpcs() { + return rpcs; + } } - Map, LinkedHashSet> newRoutedRpcMap = - routingTableData.getRoutedRpcMap(); - routeIds = newRoutedRpcMap.keySet(); + /** + * All messages used by the RpcRegistry. + */ + public static class Messages { + abstract static class AbstractRouteMessage { + final List> routeIdentifiers; + + AbstractRouteMessage(final List> routeIdentifiers) { + Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(), + "Route Identifiers must be supplied"); + this.routeIdentifiers = routeIdentifiers; + } + + List> getRouteIdentifiers() { + return this.routeIdentifiers; + } + + @Override + public String toString() { + return "ContainsRoute{" + "routeIdentifiers=" + routeIdentifiers + '}'; + } + } - for(RpcRouter.RouteIdentifier routeId : routeIds) { - Set routeAddresses = newRoutedRpcMap.get(routeId); - for(String routeAddress : routeAddresses) { - routingTable.addRoutedRpc(routeId, routeAddress); - } - } - } - - private ActorSelection getRandomRegistryActor() { - ClusterEvent.CurrentClusterState clusterState = clusterWrapper.getState(); - ActorSelection actor = null; - Set members = JavaConversions.asJavaSet(clusterState.members()); - int memberSize = members.size(); - // Don't select yourself - if(memberSize > 1) { - Address currentNodeAddress = clusterWrapper.getAddress(); - int index = new Random().nextInt(memberSize); - int i = 0; - // keeping previous member, in case when random index member is same as current actor - // and current actor member is last in set - Member previousMember = null; - for(Member member : members){ - if(i == index-1) { - previousMember = member; + public static final class AddOrUpdateRoutes extends AbstractRouteMessage { + public AddOrUpdateRoutes(final List> routeIdentifiers) { + super(routeIdentifiers); + } } - if(i == index) { - if(!currentNodeAddress.equals(member.address())) { - actor = this.context().actorSelection(member.address() + ActorConstants.RPC_REGISTRY_PATH); - break; - } else if(index < memberSize-1){ // pick the next element in the set - index++; - } + + public static final class RemoveRoutes extends AbstractRouteMessage { + public RemoveRoutes(final List> routeIdentifiers) { + super(routeIdentifiers); + } } - i++; - } - if(actor == null && previousMember != null) { - actor = this.context().actorSelection(previousMember.address() + ActorConstants.RPC_REGISTRY_PATH); - } - } - return actor; - } - private class SendRoutingTable implements Runnable { + public static final class UpdateRemoteEndpoints { + private final Map> endpoints; - @Override - public void run() { - RoutingTableData routingTableData = - new RoutingTableData(routingTable.getGlobalRpcMap(), routingTable.getRoutedRpcMap()); - LOG.debug("Sending routing table for sync {}", routingTableData); - ActorSelection actor = getRandomRegistryActor(); - if(actor != null) { - actor.tell(routingTableData, self()); - } + UpdateRemoteEndpoints(final Map> endpoints) { + this.endpoints = ImmutableMap.copyOf(endpoints); + } + + public Map> getEndpoints() { + return endpoints; + } + } } - } }