*/
package org.opendaylight.controller.remote.rpc.registry;
-import akka.actor.ActorSelection;
+import akka.actor.ActorRef;
import akka.actor.Address;
import akka.actor.Props;
-import akka.cluster.ClusterEvent;
-import akka.cluster.Member;
-import akka.japi.Creator;
-import org.opendaylight.controller.remote.rpc.AbstractUntypedActor;
-import org.opendaylight.controller.remote.rpc.ActorConstants;
-import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc;
-import org.opendaylight.controller.remote.rpc.messages.AddRpc;
-import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc;
-import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply;
-import org.opendaylight.controller.remote.rpc.messages.GetRpc;
-import org.opendaylight.controller.remote.rpc.messages.GetRpcReply;
-import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc;
-import org.opendaylight.controller.remote.rpc.messages.RemoveRpc;
-import org.opendaylight.controller.remote.rpc.messages.RoutingTableData;
+import akka.actor.UntypedActor;
+import akka.dispatch.Mapper;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import akka.japi.Option;
+import akka.japi.Pair;
+import akka.pattern.Patterns;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
+import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.collection.JavaConversions;
+import scala.concurrent.Future;
-import java.util.LinkedHashSet;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
/**
- * This Actor maintains the routing table state and sync it with other nodes in the cluster.
- *
- * A scheduler runs after an interval of time, which pick a random member from the cluster
- * and send the current state of routing table to the member.
- *
- * when a message of routing table data is received, it gets merged with the local routing table
- * to keep the latest data.
+ * Registry to look up cluster nodes that have registered for a given rpc.
+ * <p/>
+ * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
+ * cluster wide information.
*/
+public class RpcRegistry extends UntypedActor {
+
+ final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+
+ /**
+ * Store to keep the registry. Bucket store sync's it across nodes in the cluster
+ */
+ private ActorRef bucketStore;
-public class RpcRegistry extends AbstractUntypedActor {
-
- private static final Logger LOG = LoggerFactory.getLogger(RpcRegistry.class);
- private RoutingTable<RpcRouter.RouteIdentifier<?, ?, ?>, String> routingTable;
- private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
- private final ClusterWrapper clusterWrapper;
- private final ScheduledFuture<?> syncScheduler;
-
- private RpcRegistry(ClusterWrapper clusterWrapper){
- this.routingTable = new RoutingTable<>();
- this.clusterWrapper = clusterWrapper;
- this.syncScheduler = scheduler.scheduleAtFixedRate(new SendRoutingTable(), 10, 10, TimeUnit.SECONDS);
- }
-
- public static Props props(final ClusterWrapper clusterWrapper){
- return Props.create(new Creator<RpcRegistry>(){
-
- @Override
- public RpcRegistry create() throws Exception {
- return new RpcRegistry(clusterWrapper);
- }
- });
- }
-
- @Override
- protected void handleReceive(Object message) throws Exception {
- LOG.debug("Received message {}", message);
- if(message instanceof RoutingTableData) {
- syncRoutingTable((RoutingTableData) message);
- } else if(message instanceof GetRoutedRpc) {
- getRoutedRpc((GetRoutedRpc) message);
- } else if(message instanceof GetRpc) {
- getRpc((GetRpc) message);
- } else if(message instanceof AddRpc) {
- addRpc((AddRpc) message);
- } else if(message instanceof RemoveRpc) {
- removeRpc((RemoveRpc) message);
- } else if(message instanceof AddRoutedRpc) {
- addRoutedRpc((AddRoutedRpc) message);
- } else if(message instanceof RemoveRoutedRpc) {
- removeRoutedRpc((RemoveRoutedRpc) message);
+ /**
+ * Rpc broker that would use the registry to route requests.
+ */
+ private ActorRef localRouter;
+
+ public RpcRegistry() {
+ bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store");
+
+ log.info("Bucket store path = {}", bucketStore.path().toString());
}
- }
- private void getRoutedRpc(GetRoutedRpc rpcMsg){
- LOG.debug("Get latest routed Rpc location from routing table {}", rpcMsg);
- String remoteActorPath = routingTable.getLastAddedRoutedRpc(rpcMsg.getRouteId());
- GetRoutedRpcReply routedRpcReply = new GetRoutedRpcReply(remoteActorPath);
+ public RpcRegistry(ActorRef bucketStore) {
+ this.bucketStore = bucketStore;
+ }
+
+ @Override
+ public void onReceive(Object message) throws Exception {
+
+ log.debug("Received message: message [{}]", message);
+
+ //TODO: if sender is remote, reject message
- getSender().tell(routedRpcReply, self());
- }
+ if (message instanceof SetLocalRouter)
+ receiveSetLocalRouter((SetLocalRouter) message);
- private void getRpc(GetRpc rpcMsg) {
- LOG.debug("Get global Rpc location from routing table {}", rpcMsg);
- String remoteActorPath = routingTable.getGlobalRoute(rpcMsg.getRouteId());
- GetRpcReply rpcReply = new GetRpcReply(remoteActorPath);
+ if (message instanceof AddOrUpdateRoutes)
+ receiveAddRoutes((AddOrUpdateRoutes) message);
- getSender().tell(rpcReply, self());
- }
+ else if (message instanceof RemoveRoutes)
+ receiveRemoveRoutes((RemoveRoutes) message);
- private void addRpc(AddRpc rpcMsg) {
- LOG.debug("Add Rpc to routing table {}", rpcMsg);
- routingTable.addGlobalRoute(rpcMsg.getRouteId(), rpcMsg.getActorPath());
+ else if (message instanceof Messages.FindRouters)
+ receiveGetRouter((FindRouters) message);
- getSender().tell("Success", self());
- }
+ else
+ unhandled(message);
+ }
+
+ /**
+ * Register's rpc broker
+ *
+ * @param message contains {@link akka.actor.ActorRef} for rpc broker
+ */
+ private void receiveSetLocalRouter(SetLocalRouter message) {
+ localRouter = message.getRouter();
+ }
- private void removeRpc(RemoveRpc rpcMsg) {
- LOG.debug("Removing Rpc to routing table {}", rpcMsg);
- routingTable.removeGlobalRoute(rpcMsg.getRouteId());
+ /**
+ * @param msg
+ */
+ private void receiveAddRoutes(AddOrUpdateRoutes msg) {
- getSender().tell("Success", self());
- }
+ Preconditions.checkState(localRouter != null, "Router must be set first");
- private void addRoutedRpc(AddRoutedRpc rpcMsg) {
- routingTable.addRoutedRpcs(rpcMsg.getAnnouncements(), rpcMsg.getActorPath());
- getSender().tell("Success", self());
- }
+ Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), ActorUtil.ASK_DURATION.toMillis());
+ futureReply.map(getMapperToAddRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
+ }
- private void removeRoutedRpc(RemoveRoutedRpc rpcMsg) {
- routingTable.removeRoutes(rpcMsg.getAnnouncements(), rpcMsg.getActorPath());
- getSender().tell("Success", self());
- }
+ /**
+ * @param msg contains list of route ids to remove
+ */
+ private void receiveRemoveRoutes(RemoveRoutes msg) {
- private void syncRoutingTable(RoutingTableData routingTableData) {
- LOG.debug("Syncing routing table {}", routingTableData);
+ Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), ActorUtil.ASK_DURATION.toMillis());
+ futureReply.map(getMapperToRemoveRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
- Map<RpcRouter.RouteIdentifier<?, ?, ?>, String> newRpcMap = routingTableData.getRpcMap();
- Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = newRpcMap.keySet();
- for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
- routingTable.addGlobalRoute(routeId, newRpcMap.get(routeId));
}
- Map<RpcRouter.RouteIdentifier<?, ?, ?>, LinkedHashSet<String>> newRoutedRpcMap =
- routingTableData.getRoutedRpcMap();
- routeIds = newRoutedRpcMap.keySet();
+ /**
+ * Finds routers for the given rpc.
+ *
+ * @param msg
+ */
+ private void receiveGetRouter(FindRouters msg) {
+ final ActorRef sender = getSender();
- for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
- Set<String> routeAddresses = newRoutedRpcMap.get(routeId);
- for(String routeAddress : routeAddresses) {
- routingTable.addRoutedRpc(routeId, routeAddress);
- }
+ Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), ActorUtil.ASK_DURATION.toMillis());
+ futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher());
}
- }
-
- private ActorSelection getRandomRegistryActor() {
- ClusterEvent.CurrentClusterState clusterState = clusterWrapper.getState();
- ActorSelection actor = null;
- Set<Member> members = JavaConversions.asJavaSet(clusterState.members());
- int memberSize = members.size();
- // Don't select yourself
- if(memberSize > 1) {
- Address currentNodeAddress = clusterWrapper.getAddress();
- int index = new Random().nextInt(memberSize);
- int i = 0;
- // keeping previous member, in case when random index member is same as current actor
- // and current actor member is last in set
- Member previousMember = null;
- for(Member member : members){
- if(i == index-1) {
- previousMember = member;
- }
- if(i == index) {
- if(!currentNodeAddress.equals(member.address())) {
- actor = this.context().actorSelection(member.address() + ActorConstants.RPC_REGISTRY_PATH);
- break;
- } else if(index < memberSize-1){ // pick the next element in the set
- index++;
- }
+
+ /**
+ * Helper to create empty reply when no routers are found
+ *
+ * @return
+ */
+ private Messages.FindRoutersReply createEmptyReply() {
+ List<Pair<ActorRef, Long>> routerWithUpdateTime = Collections.emptyList();
+ return new Messages.FindRoutersReply(routerWithUpdateTime);
+ }
+
+ /**
+ * Helper to create a reply when routers are found for the given rpc
+ *
+ * @param buckets
+ * @param routeId
+ * @return
+ */
+ private Messages.FindRoutersReply createReplyWithRouters(Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
+
+ List<Pair<ActorRef, Long>> routers = new ArrayList<>();
+ Option<Pair<ActorRef, Long>> routerWithUpdateTime = null;
+
+ for (Bucket bucket : buckets.values()) {
+
+ RoutingTable table = (RoutingTable) bucket.getData();
+ if (table == null)
+ continue;
+
+ routerWithUpdateTime = table.getRouterFor(routeId);
+ if (routerWithUpdateTime.isEmpty())
+ continue;
+
+ routers.add(routerWithUpdateTime.get());
}
- i++;
- }
- if(actor == null && previousMember != null) {
- actor = this.context().actorSelection(previousMember.address() + ActorConstants.RPC_REGISTRY_PATH);
- }
+
+ return new Messages.FindRoutersReply(routers);
}
- return actor;
- }
- private class SendRoutingTable implements Runnable {
- @Override
- public void run() {
- RoutingTableData routingTableData =
- new RoutingTableData(routingTable.getGlobalRpcMap(), routingTable.getRoutedRpcMap());
- LOG.debug("Sending routing table for sync {}", routingTableData);
- ActorSelection actor = getRandomRegistryActor();
- if(actor != null) {
- actor.tell(routingTableData, self());
- }
+ ///
+ ///private factories to create Mapper
+ ///
+
+ /**
+ * Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found
+ *
+ * @param routeId the rpc
+ * @param sender client who asked to find the routers.
+ * @return
+ */
+ private Mapper<Object, Void> getMapperToGetRouter(final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
+ return new Mapper<Object, Void>() {
+ @Override
+ public Void apply(Object replyMessage) {
+
+ if (replyMessage instanceof GetAllBucketsReply) {
+
+ GetAllBucketsReply reply = (GetAllBucketsReply) replyMessage;
+ Map<Address, Bucket> buckets = reply.getBuckets();
+
+ if (buckets == null || buckets.isEmpty()) {
+ sender.tell(createEmptyReply(), getSelf());
+ return null;
+ }
+
+ sender.tell(createReplyWithRouters(buckets, routeId), getSelf());
+ }
+ return null;
+ }
+ };
+ }
+
+ /**
+ * Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently,
+ * it updates the local bucket in bucket store.
+ *
+ * @param routeIds rpc to remote
+ * @return
+ */
+ private Mapper<Object, Void> getMapperToRemoveRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
+ return new Mapper<Object, Void>() {
+ @Override
+ public Void apply(Object replyMessage) {
+ if (replyMessage instanceof GetLocalBucketReply) {
+
+ GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
+ Bucket<RoutingTable> bucket = reply.getBucket();
+
+ if (bucket == null) {
+ log.debug("Local bucket is null");
+ return null;
+ }
+
+ RoutingTable table = bucket.getData();
+ if (table == null)
+ table = new RoutingTable();
+
+ table.setRouter(localRouter);
+
+ if (!table.isEmpty()) {
+ for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
+ table.removeRoute(routeId);
+ }
+ }
+ bucket.setData(table);
+
+ UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
+ bucketStore.tell(updateBucketMessage, getSelf());
+ }
+ return null;
+ }
+ };
+ }
+
+ /**
+ * Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently,
+ * it updates the local bucket in bucket store.
+ *
+ * @param routeIds rpc to add
+ * @return
+ */
+ private Mapper<Object, Void> getMapperToAddRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
+
+ return new Mapper<Object, Void>() {
+ @Override
+ public Void apply(Object replyMessage) {
+ if (replyMessage instanceof GetLocalBucketReply) {
+
+ GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
+ Bucket<RoutingTable> bucket = reply.getBucket();
+
+ if (bucket == null) {
+ log.debug("Local bucket is null");
+ return null;
+ }
+
+ RoutingTable table = bucket.getData();
+ if (table == null)
+ table = new RoutingTable();
+
+ table.setRouter(localRouter);
+ for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
+ table.addRoute(routeId);
+ }
+
+ bucket.setData(table);
+
+ UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
+ bucketStore.tell(updateBucketMessage, getSelf());
+ }
+
+ return null;
+ }
+ };
+ }
+
+ /**
+ * All messages used by the RpcRegistry
+ */
+ public static class Messages {
+
+
+ public static class ContainsRoute {
+ final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
+
+ public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+ Preconditions.checkArgument(routeIdentifiers != null &&
+ !routeIdentifiers.isEmpty(),
+ "Route Identifiers must be supplied");
+ this.routeIdentifiers = routeIdentifiers;
+ }
+
+ public List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
+ return this.routeIdentifiers;
+ }
+
+ @Override
+ public String toString() {
+ return "ContainsRoute{" +
+ "routeIdentifiers=" + routeIdentifiers +
+ '}';
+ }
+ }
+
+ public static class AddOrUpdateRoutes extends ContainsRoute {
+
+ public AddOrUpdateRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+ super(routeIdentifiers);
+ }
+ }
+
+ public static class RemoveRoutes extends ContainsRoute {
+
+ public RemoveRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+ super(routeIdentifiers);
+ }
+ }
+
+ public static class SetLocalRouter {
+ private final ActorRef router;
+
+ public SetLocalRouter(ActorRef router) {
+ Preconditions.checkArgument(router != null, "Router must not be null");
+ this.router = router;
+ }
+
+ public ActorRef getRouter() {
+ return this.router;
+ }
+
+ @Override
+ public String toString() {
+ return "SetLocalRouter{" +
+ "router=" + router +
+ '}';
+ }
+ }
+
+ public static class FindRouters {
+ private final RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier;
+
+ public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
+ Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
+ this.routeIdentifier = routeIdentifier;
+ }
+
+ public RpcRouter.RouteIdentifier<?, ?, ?> getRouteIdentifier() {
+ return routeIdentifier;
+ }
+
+ @Override
+ public String toString() {
+ return "FindRouters{" +
+ "routeIdentifier=" + routeIdentifier +
+ '}';
+ }
+ }
+
+ public static class FindRoutersReply {
+ final List<Pair<ActorRef, Long>> routerWithUpdateTime;
+
+ public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
+ Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
+ this.routerWithUpdateTime = routerWithUpdateTime;
+ }
+
+ public List<Pair<ActorRef, Long>> getRouterWithUpdateTime() {
+ return routerWithUpdateTime;
+ }
+
+ @Override
+ public String toString() {
+ return "FindRoutersReply{" +
+ "routerWithUpdateTime=" + routerWithUpdateTime +
+ '}';
+ }
+ }
}
- }
}