X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2FRpcRegistry.java;h=51609870cc4aad1c8789dfdcd0b68f04563b5cdf;hp=e36060cc13ece309f04f10adebdb74a62f158146;hb=73b088ee110766618a8728eed653b15cef896cf1;hpb=fcb707ea1dadb4775c80327adef7edac67ff9db0 diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java index e36060cc13..2c89f14260 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java @@ -7,197 +7,194 @@ */ package org.opendaylight.controller.remote.rpc.registry; -import akka.actor.ActorSelection; +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +import akka.actor.ActorRef; import akka.actor.Address; import akka.actor.Props; -import akka.cluster.ClusterEvent; -import akka.cluster.Member; -import akka.japi.Creator; -import org.opendaylight.controller.remote.rpc.AbstractUntypedActor; -import org.opendaylight.controller.remote.rpc.ActorConstants; -import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.AddRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply; -import org.opendaylight.controller.remote.rpc.messages.GetRpc; -import org.opendaylight.controller.remote.rpc.messages.GetRpcReply; -import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc; -import org.opendaylight.controller.remote.rpc.messages.RemoveRpc; -import org.opendaylight.controller.remote.rpc.messages.RoutingTableData; -import org.opendaylight.controller.sal.connector.api.RpcRouter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.collection.JavaConversions; - -import java.util.LinkedHashSet; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.Random; +import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; +import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; +import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess; +import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor; +import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl; +import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier; /** - * This Actor maintains the routing table state and sync it with other nodes in the cluster. - * - * A scheduler runs after an interval of time, which pick a random member from the cluster - * and send the current state of routing table to the member. + * Registry to look up cluster nodes that have registered for a given RPC. * - * when a message of routing table data is received, it gets merged with the local routing table - * to keep the latest data. + *

+ * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor} to maintain this + * cluster wide information. */ +public class RpcRegistry extends BucketStoreActor { + private final ActorRef rpcRegistrar; + private RemoteRpcRegistryMXBeanImpl mxBean; -public class RpcRegistry extends AbstractUntypedActor { - - private static final Logger LOG = LoggerFactory.getLogger(RpcRegistry.class); - private RoutingTable, String> routingTable; - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - private final ClusterWrapper clusterWrapper; - private final ScheduledFuture syncScheduler; - - private RpcRegistry(ClusterWrapper clusterWrapper){ - this.routingTable = new RoutingTable<>(); - this.clusterWrapper = clusterWrapper; - this.syncScheduler = scheduler.scheduleAtFixedRate(new SendRoutingTable(), 10, 10, TimeUnit.SECONDS); - } - - public static Props props(final ClusterWrapper clusterWrapper){ - return Props.create(new Creator(){ - - @Override - public RpcRegistry create() throws Exception { - return new RpcRegistry(clusterWrapper); - } - }); - } - - @Override - protected void handleReceive(Object message) throws Exception { - LOG.debug("Received message {}", message); - if(message instanceof RoutingTableData) { - syncRoutingTable((RoutingTableData) message); - } else if(message instanceof GetRoutedRpc) { - getRoutedRpc((GetRoutedRpc) message); - } else if(message instanceof GetRpc) { - getRpc((GetRpc) message); - } else if(message instanceof AddRpc) { - addRpc((AddRpc) message); - } else if(message instanceof RemoveRpc) { - removeRpc((RemoveRpc) message); - } else if(message instanceof AddRoutedRpc) { - addRoutedRpc((AddRoutedRpc) message); - } else if(message instanceof RemoveRoutedRpc) { - removeRoutedRpc((RemoveRoutedRpc) message); - } - } + public RpcRegistry(final RemoteOpsProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) { + super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker, ImmutableSet.of())); + this.rpcRegistrar = requireNonNull(rpcRegistrar); - private void getRoutedRpc(GetRoutedRpc rpcMsg){ - LOG.debug("Get latest routed Rpc location from routing table {}", rpcMsg); - String remoteActorPath = routingTable.getLastAddedRoutedRpc(rpcMsg.getRouteId()); - GetRoutedRpcReply routedRpcReply = new GetRoutedRpcReply(remoteActorPath); + } - getSender().tell(routedRpcReply, self()); - } + /** + * Create a new props instance for instantiating an RpcRegistry actor. + * + * @param config Provider configuration + * @param rpcRegistrar Local RPC provider interface, used to register routers to remote nodes + * @param rpcInvoker Actor handling RPC invocation requests from remote nodes + * @return A new {@link Props} instance + */ + public static Props props(final RemoteOpsProviderConfig config, final ActorRef rpcInvoker, + final ActorRef rpcRegistrar) { + return Props.create(RpcRegistry.class, config, rpcInvoker, rpcRegistrar); + } - private void getRpc(GetRpc rpcMsg) { - LOG.debug("Get global Rpc location from routing table {}", rpcMsg); - String remoteActorPath = routingTable.getGlobalRoute(rpcMsg.getRouteId()); - GetRpcReply rpcReply = new GetRpcReply(remoteActorPath); + @Override + public void preStart() { + super.preStart(); + mxBean = new RemoteRpcRegistryMXBeanImpl(new BucketStoreAccess(self(), getContext().dispatcher(), + getConfig().getAskDuration()), getConfig().getAskDuration()); + } - getSender().tell(rpcReply, self()); - } + @Override + public void postStop() throws Exception { + if (mxBean != null) { + mxBean.unregister(); + mxBean = null; + } + super.postStop(); + } - private void addRpc(AddRpc rpcMsg) { - LOG.debug("Add Rpc to routing table {}", rpcMsg); - routingTable.addGlobalRoute(rpcMsg.getRouteId(), rpcMsg.getActorPath()); + @Override + protected void handleCommand(final Object message) throws Exception { + if (message instanceof AddOrUpdateRoutes) { + receiveAddRoutes((AddOrUpdateRoutes) message); + } else if (message instanceof RemoveRoutes) { + receiveRemoveRoutes((RemoveRoutes) message); + } else { + super.handleCommand(message); + } + } - getSender().tell("Success", self()); - } + private void receiveAddRoutes(final AddOrUpdateRoutes msg) { + LOG.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers()); + updateLocalBucket(getLocalData().addRpcs(msg.getRouteIdentifiers())); + } - private void removeRpc(RemoveRpc rpcMsg) { - LOG.debug("Removing Rpc to routing table {}", rpcMsg); - routingTable.removeGlobalRoute(rpcMsg.getRouteId()); + /** + * Processes a RemoveRoutes message. + * + * @param msg contains list of route ids to remove + */ + private void receiveRemoveRoutes(final RemoveRoutes msg) { + LOG.debug("RemoveRoutes: {}", msg.getRouteIdentifiers()); + updateLocalBucket(getLocalData().removeRpcs(msg.getRouteIdentifiers())); + } - getSender().tell("Success", self()); - } + @Override + protected void onBucketRemoved(final Address address, final Bucket bucket) { + rpcRegistrar.tell(new Messages.UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())), + ActorRef.noSender()); + } - private void addRoutedRpc(AddRoutedRpc rpcMsg) { - routingTable.addRoutedRpcs(rpcMsg.getAnnouncements(), rpcMsg.getActorPath()); - getSender().tell("Success", self()); - } + @Override + protected void onBucketsUpdated(final Map> buckets) { + final Map> endpoints = new HashMap<>(buckets.size()); - private void removeRoutedRpc(RemoveRoutedRpc rpcMsg) { - routingTable.removeRoutes(rpcMsg.getAnnouncements(), rpcMsg.getActorPath()); - getSender().tell("Success", self()); - } + for (Entry> e : buckets.entrySet()) { + final RoutingTable table = e.getValue().getData(); - private void syncRoutingTable(RoutingTableData routingTableData) { - LOG.debug("Syncing routing table {}", routingTableData); + final Collection rpcs = table.getItems(); + endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty() + : Optional.of(new RemoteRpcEndpoint(table.getInvoker(), rpcs))); + } - Map, String> newRpcMap = routingTableData.getRpcMap(); - Set> routeIds = newRpcMap.keySet(); - for(RpcRouter.RouteIdentifier routeId : routeIds) { - routingTable.addGlobalRoute(routeId, newRpcMap.get(routeId)); + if (!endpoints.isEmpty()) { + rpcRegistrar.tell(new Messages.UpdateRemoteEndpoints(endpoints), ActorRef.noSender()); + } } - Map, LinkedHashSet> newRoutedRpcMap = - routingTableData.getRoutedRpcMap(); - routeIds = newRoutedRpcMap.keySet(); + public static final class RemoteRpcEndpoint { + private final Set rpcs; + private final ActorRef router; - for(RpcRouter.RouteIdentifier routeId : routeIds) { - Set routeAddresses = newRoutedRpcMap.get(routeId); - for(String routeAddress : routeAddresses) { - routingTable.addRoutedRpc(routeId, routeAddress); - } - } - } - - private ActorSelection getRandomRegistryActor() { - ClusterEvent.CurrentClusterState clusterState = clusterWrapper.getState(); - ActorSelection actor = null; - Set members = JavaConversions.asJavaSet(clusterState.members()); - int memberSize = members.size(); - // Don't select yourself - if(memberSize > 1) { - Address currentNodeAddress = clusterWrapper.getAddress(); - int index = new Random().nextInt(memberSize); - int i = 0; - // keeping previous member, in case when random index member is same as current actor - // and current actor member is last in set - Member previousMember = null; - for(Member member : members){ - if(i == index-1) { - previousMember = member; + @VisibleForTesting + public RemoteRpcEndpoint(final ActorRef router, final Collection rpcs) { + this.router = requireNonNull(router); + this.rpcs = ImmutableSet.copyOf(rpcs); } - if(i == index) { - if(!currentNodeAddress.equals(member.address())) { - actor = this.context().actorSelection(member.address() + ActorConstants.RPC_REGISTRY_PATH); - break; - } else if(index < memberSize-1){ // pick the next element in the set - index++; - } + + public ActorRef getRouter() { + return router; + } + + public Set getRpcs() { + return rpcs; } - i++; - } - if(actor == null && previousMember != null) { - actor = this.context().actorSelection(previousMember.address() + ActorConstants.RPC_REGISTRY_PATH); - } } - return actor; - } - private class SendRoutingTable implements Runnable { + /** + * All messages used by the RpcRegistry. + */ + public static class Messages { + abstract static class AbstractRouteMessage { + final List rpcRouteIdentifiers; + + AbstractRouteMessage(final Collection rpcRouteIdentifiers) { + checkArgument(rpcRouteIdentifiers != null && !rpcRouteIdentifiers.isEmpty(), + "Route Identifiers must be supplied"); + this.rpcRouteIdentifiers = ImmutableList.copyOf(rpcRouteIdentifiers); + } + + List getRouteIdentifiers() { + return this.rpcRouteIdentifiers; + } + + @Override + public String toString() { + return "ContainsRoute{" + "routeIdentifiers=" + rpcRouteIdentifiers + '}'; + } + } + + public static final class AddOrUpdateRoutes extends Messages.AbstractRouteMessage { + public AddOrUpdateRoutes(final Collection rpcRouteIdentifiers) { + super(rpcRouteIdentifiers); + } - @Override - public void run() { - RoutingTableData routingTableData = - new RoutingTableData(routingTable.getGlobalRpcMap(), routingTable.getRoutedRpcMap()); - LOG.debug("Sending routing table for sync {}", routingTableData); - ActorSelection actor = getRandomRegistryActor(); - if(actor != null) { - actor.tell(routingTableData, self()); - } + } + + public static final class RemoveRoutes extends AbstractRouteMessage { + public RemoveRoutes(final Collection rpcRouteIdentifiers) { + super(rpcRouteIdentifiers); + } + } + + public static final class UpdateRemoteEndpoints { + private final Map> rpcEndpoints; + + + @VisibleForTesting + public UpdateRemoteEndpoints(final Map> rpcEndpoints) { + this.rpcEndpoints = ImmutableMap.copyOf(rpcEndpoints); + } + + public Map> getRpcEndpoints() { + return rpcEndpoints; + } + } } - } }