*/
package org.opendaylight.controller.remote.rpc.registry;
-import akka.actor.ActorSelection;
+import akka.actor.ActorRef;
import akka.actor.Address;
import akka.actor.Props;
-import akka.cluster.ClusterEvent;
-import akka.cluster.Member;
-import akka.japi.Creator;
-import org.opendaylight.controller.remote.rpc.AbstractUntypedActor;
-import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc;
-import org.opendaylight.controller.remote.rpc.messages.AddRpc;
-import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc;
-import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply;
-import org.opendaylight.controller.remote.rpc.messages.GetRpc;
-import org.opendaylight.controller.remote.rpc.messages.GetRpcReply;
-import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc;
-import org.opendaylight.controller.remote.rpc.messages.RemoveRpc;
-import org.opendaylight.controller.remote.rpc.messages.RoutingTableData;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.collection.JavaConversions;
-
-import java.util.LinkedHashSet;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
-import java.util.Random;
+import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor;
/**
- * This Actor maintains the routing table state and sync it with other nodes in the cluster.
+ * Registry to look up cluster nodes that have registered for a given RPC.
*
- * A scheduler runs after an interval of time, which pick a random member from the cluster
- * and send the current state of routing table to the member.
- *
- * when a message of routing table data is received, it gets merged with the local routing table
- * to keep the latest data.
+ * <p>
+ * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor} to maintain this
+ * cluster wide information.
*/
+public class RpcRegistry extends BucketStoreActor<RoutingTable> {
+ private final ActorRef rpcRegistrar;
+
+ public RpcRegistry(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
+ super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker, ImmutableSet.of()));
+ this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar);
+ }
-public class RpcRegistry extends AbstractUntypedActor {
-
- private static final Logger LOG = LoggerFactory.getLogger(RpcRegistry.class);
- private RoutingTable<RpcRouter.RouteIdentifier<?, ?, ?>, String> routingTable;
- private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
- private final ClusterWrapper clusterWrapper;
- private final ScheduledFuture<?> syncScheduler;
-
- private RpcRegistry(ClusterWrapper clusterWrapper){
- this.routingTable = new RoutingTable<>();
- this.clusterWrapper = clusterWrapper;
- this.syncScheduler = scheduler.scheduleAtFixedRate(new SendRoutingTable(), 10, 10, TimeUnit.SECONDS);
- }
-
- public static Props props(final ClusterWrapper clusterWrapper){
- return Props.create(new Creator<RpcRegistry>(){
-
- @Override
- public RpcRegistry create() throws Exception {
- return new RpcRegistry(clusterWrapper);
- }
- });
- }
-
- @Override
- protected void handleReceive(Object message) throws Exception {
- LOG.debug("Received message {}", message);
- if(message instanceof RoutingTableData) {
- syncRoutingTable((RoutingTableData) message);
- } else if(message instanceof GetRoutedRpc) {
- getRoutedRpc((GetRoutedRpc) message);
- } else if(message instanceof GetRpc) {
- getRpc((GetRpc) message);
- } else if(message instanceof AddRpc) {
- addRpc((AddRpc) message);
- } else if(message instanceof RemoveRpc) {
- removeRpc((RemoveRpc) message);
- } else if(message instanceof AddRoutedRpc) {
- addRoutedRpc((AddRoutedRpc) message);
- } else if(message instanceof RemoveRoutedRpc) {
- removeRoutedRpc((RemoveRoutedRpc) message);
+ /**
+ * Create a new props instance for instantiating an RpcRegistry actor.
+ *
+ * @param config Provider configuration
+ * @param rpcRegistrar Local RPC provider interface, used to register routers to remote nodes
+ * @param rpcInvoker Actor handling RPC invocation requests from remote nodes
+ * @return A new {@link Props} instance
+ */
+ public static Props props(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker,
+ final ActorRef rpcRegistrar) {
+ return Props.create(RpcRegistry.class, config, rpcInvoker, rpcRegistrar);
}
- }
- private void getRoutedRpc(GetRoutedRpc rpcMsg){
- LOG.debug("Get latest routed Rpc location from routing table {}", rpcMsg);
- String remoteActorPath = routingTable.getLastAddedRoutedRpc(rpcMsg.getRouteId());
- GetRoutedRpcReply routedRpcReply = new GetRoutedRpcReply(remoteActorPath);
+ @Override
+ protected void handleCommand(final Object message) throws Exception {
+ if (message instanceof AddOrUpdateRoutes) {
+ receiveAddRoutes((AddOrUpdateRoutes) message);
+ } else if (message instanceof RemoveRoutes) {
+ receiveRemoveRoutes((RemoveRoutes) message);
+ } else {
+ super.handleCommand(message);
+ }
+ }
- getSender().tell(routedRpcReply, self());
- }
+ private void receiveAddRoutes(final AddOrUpdateRoutes msg) {
+ LOG.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
+ updateLocalBucket(getLocalData().addRpcs(msg.getRouteIdentifiers()));
+ }
- private void getRpc(GetRpc rpcMsg) {
- LOG.debug("Get global Rpc location from routing table {}", rpcMsg);
- String remoteActorPath = routingTable.getGlobalRoute(rpcMsg.getRouteId());
- GetRpcReply rpcReply = new GetRpcReply(remoteActorPath);
+ /**
+ * Processes a RemoveRoutes message.
+ *
+ * @param msg contains list of route ids to remove
+ */
+ private void receiveRemoveRoutes(final RemoveRoutes msg) {
+ LOG.debug("RemoveRoutes: {}", msg.getRouteIdentifiers());
+ updateLocalBucket(getLocalData().removeRpcs(msg.getRouteIdentifiers()));
+ }
- getSender().tell(rpcReply, self());
- }
+ @Override
+ protected void onBucketRemoved(final Address address, final Bucket<RoutingTable> bucket) {
+ rpcRegistrar.tell(new UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())), ActorRef.noSender());
+ }
- private void addRpc(AddRpc rpcMsg) {
- LOG.debug("Add Rpc to routing table {}", rpcMsg);
- routingTable.addGlobalRoute(rpcMsg.getRouteId(), rpcMsg.getActorPath());
+ @Override
+ protected void onBucketsUpdated(final Map<Address, Bucket<RoutingTable>> buckets) {
+ final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = new HashMap<>(buckets.size());
- getSender().tell("Success", self());
- }
+ for (Entry<Address, Bucket<RoutingTable>> e : buckets.entrySet()) {
+ final RoutingTable table = e.getValue().getData();
- private void removeRpc(RemoveRpc rpcMsg) {
- LOG.debug("Removing Rpc to routing table {}", rpcMsg);
- routingTable.removeGlobalRoute(rpcMsg.getRouteId());
+ final Collection<DOMRpcIdentifier> rpcs = table.getRoutes();
+ endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty()
+ : Optional.of(new RemoteRpcEndpoint(table.getRpcInvoker(), rpcs)));
+ }
- getSender().tell("Success", self());
- }
+ if (!endpoints.isEmpty()) {
+ rpcRegistrar.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
+ }
+ }
- private void addRoutedRpc(AddRoutedRpc rpcMsg) {
- routingTable.addRoutedRpcs(rpcMsg.getAnnouncements(), rpcMsg.getActorPath());
- getSender().tell("Success", self());
- }
+ public static final class RemoteRpcEndpoint {
+ private final Set<DOMRpcIdentifier> rpcs;
+ private final ActorRef router;
- private void removeRoutedRpc(RemoveRoutedRpc rpcMsg) {
- routingTable.removeRoutes(rpcMsg.getAnnouncements(), rpcMsg.getActorPath());
- getSender().tell("Success", self());
- }
+ RemoteRpcEndpoint(final ActorRef router, final Collection<DOMRpcIdentifier> rpcs) {
+ this.router = Preconditions.checkNotNull(router);
+ this.rpcs = ImmutableSet.copyOf(rpcs);
+ }
- private void syncRoutingTable(RoutingTableData routingTableData) {
- LOG.debug("Syncing routing table {}", routingTableData);
+ public ActorRef getRouter() {
+ return router;
+ }
- Map<RpcRouter.RouteIdentifier<?, ?, ?>, String> newRpcMap = routingTableData.getRpcMap();
- Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = newRpcMap.keySet();
- for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
- routingTable.addGlobalRoute(routeId, newRpcMap.get(routeId));
+ public Set<DOMRpcIdentifier> getRpcs() {
+ return rpcs;
+ }
}
- Map<RpcRouter.RouteIdentifier<?, ?, ?>, LinkedHashSet<String>> newRoutedRpcMap =
- routingTableData.getRoutedRpcMap();
- routeIds = newRoutedRpcMap.keySet();
+ /**
+ * All messages used by the RpcRegistry.
+ */
+ public static class Messages {
+ abstract static class AbstractRouteMessage {
+ final List<DOMRpcIdentifier> routeIdentifiers;
+
+ AbstractRouteMessage(final Collection<DOMRpcIdentifier> routeIdentifiers) {
+ Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(),
+ "Route Identifiers must be supplied");
+ this.routeIdentifiers = ImmutableList.copyOf(routeIdentifiers);
+ }
+
+ List<DOMRpcIdentifier> getRouteIdentifiers() {
+ return this.routeIdentifiers;
+ }
+
+ @Override
+ public String toString() {
+ return "ContainsRoute{" + "routeIdentifiers=" + routeIdentifiers + '}';
+ }
+ }
- for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
- Set<String> routeAddresses = newRoutedRpcMap.get(routeId);
- for(String routeAddress : routeAddresses) {
- routingTable.addRoutedRpc(routeId, routeAddress);
- }
- }
- }
-
- private ActorSelection getRandomRegistryActor() {
- ClusterEvent.CurrentClusterState clusterState = clusterWrapper.getState();
- ActorSelection actor = null;
- Set<Member> members = JavaConversions.asJavaSet(clusterState.members());
- int memberSize = members.size();
- // Don't select yourself
- if(memberSize > 1) {
- Address currentNodeAddress = clusterWrapper.getAddress();
- int index = new Random().nextInt(memberSize);
- int i = 0;
- // keeping previous member, in case when random index member is same as current actor
- // and current actor member is last in set
- Member previousMember = null;
- for(Member member : members){
- if(i == index-1) {
- previousMember = member;
+ public static final class AddOrUpdateRoutes extends AbstractRouteMessage {
+ public AddOrUpdateRoutes(final Collection<DOMRpcIdentifier> routeIdentifiers) {
+ super(routeIdentifiers);
+ }
}
- if(i == index) {
- if(!currentNodeAddress.equals(member.address())) {
- actor = this.context().actorSelection(member.address() + "/user/rpc-registry");
- break;
- } else if(index < memberSize-1){ // pick the next element in the set
- index++;
- }
+
+ public static final class RemoveRoutes extends AbstractRouteMessage {
+ public RemoveRoutes(final Collection<DOMRpcIdentifier> routeIdentifiers) {
+ super(routeIdentifiers);
+ }
}
- i++;
- }
- if(actor == null && previousMember != null) {
- actor = this.context().actorSelection(previousMember.address() + "/user/rpc-registry");
- }
- }
- return actor;
- }
- private class SendRoutingTable implements Runnable {
+ public static final class UpdateRemoteEndpoints {
+ private final Map<Address, Optional<RemoteRpcEndpoint>> endpoints;
- @Override
- public void run() {
- RoutingTableData routingTableData =
- new RoutingTableData(routingTable.getGlobalRpcMap(), routingTable.getRoutedRpcMap());
- LOG.debug("Sending routing table for sync {}", routingTableData);
- ActorSelection actor = getRandomRegistryActor();
- if(actor != null) {
- actor.tell(routingTableData, self());
- }
+ UpdateRemoteEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> endpoints) {
+ this.endpoints = ImmutableMap.copyOf(endpoints);
+ }
+
+ public Map<Address, Optional<RemoteRpcEndpoint>> getEndpoints() {
+ return endpoints;
+ }
+ }
}
- }
}