X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2FRpcRegistry.java;h=845c1c819a70ca6bac0fb1a717b31f7861b6a6b6;hb=71c540b3572415aef56acd4a31b503f24e9da437;hp=7cb505aa98d3d8c81c70cae5bfd5292f4da5ffd2;hpb=c46e223995956f1f759c551163c212947c1e2fb7;p=controller.git
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java
index 7cb505aa98..845c1c819a 100644
--- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java
+++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java
@@ -7,196 +7,221 @@
*/
package org.opendaylight.controller.remote.rpc.registry;
-import akka.actor.ActorSelection;
-import akka.actor.Address;
-import akka.actor.Props;
-import akka.cluster.ClusterEvent;
-import akka.cluster.Member;
-import akka.japi.Creator;
-import org.opendaylight.controller.remote.rpc.AbstractUntypedActor;
-import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc;
-import org.opendaylight.controller.remote.rpc.messages.AddRpc;
-import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc;
-import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply;
-import org.opendaylight.controller.remote.rpc.messages.GetRpc;
-import org.opendaylight.controller.remote.rpc.messages.GetRpcReply;
-import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc;
-import org.opendaylight.controller.remote.rpc.messages.RemoveRpc;
-import org.opendaylight.controller.remote.rpc.messages.RoutingTableData;
+import akka.actor.ActorRef;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import akka.japi.Option;
+import akka.japi.Pair;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.collection.JavaConversions;
-
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
/**
- * This Actor maintains the routing table state and sync it with other nodes in the cluster.
- *
- * A scheduler runs after an interval of time, which pick a random member from the cluster
- * and send the current state of routing table to the member.
- *
- * when a message of routing table data is received, it gets merged with the local routing table
- * to keep the latest data.
+ * Registry to look up cluster nodes that have registered for a given rpc.
+ *
+ * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
+ * cluster wide information.
*/
+public class RpcRegistry extends BucketStore {
+
+ final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
-public class RpcRegistry extends AbstractUntypedActor {
-
- private static final Logger LOG = LoggerFactory.getLogger(RpcRegistry.class);
- private RoutingTable, String> routingTable;
- private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
- private final ClusterWrapper clusterWrapper;
- private final ScheduledFuture> syncScheduler;
-
- private RpcRegistry(ClusterWrapper clusterWrapper){
- this.routingTable = new RoutingTable<>();
- this.clusterWrapper = clusterWrapper;
- this.syncScheduler = scheduler.scheduleAtFixedRate(new SendRoutingTable(), 10, 10, TimeUnit.SECONDS);
- }
-
- public static Props props(final ClusterWrapper clusterWrapper){
- return Props.create(new Creator(){
-
- @Override
- public RpcRegistry create() throws Exception {
- return new RpcRegistry(clusterWrapper);
- }
- });
- }
-
- @Override
- protected void handleReceive(Object message) throws Exception {
- LOG.debug("Received message {}", message);
- if(message instanceof RoutingTableData) {
- syncRoutingTable((RoutingTableData) message);
- } else if(message instanceof GetRoutedRpc) {
- getRoutedRpc((GetRoutedRpc) message);
- } else if(message instanceof GetRpc) {
- getRpc((GetRpc) message);
- } else if(message instanceof AddRpc) {
- addRpc((AddRpc) message);
- } else if(message instanceof RemoveRpc) {
- removeRpc((RemoveRpc) message);
- } else if(message instanceof AddRoutedRpc) {
- addRoutedRpc((AddRoutedRpc) message);
- } else if(message instanceof RemoveRoutedRpc) {
- removeRoutedRpc((RemoveRoutedRpc) message);
+ public RpcRegistry() {
+ getLocalBucket().setData(new RoutingTable());
}
- }
- private void getRoutedRpc(GetRoutedRpc rpcMsg){
- LOG.debug("Get latest routed Rpc location from routing table {}", rpcMsg);
- String remoteActorPath = routingTable.getLastAddedRoutedRpc(rpcMsg.getRouteId());
- GetRoutedRpcReply routedRpcReply = new GetRoutedRpcReply(remoteActorPath);
+ @Override
+ protected void handleReceive(Object message) throws Exception {
+ //TODO: if sender is remote, reject message
+
+ if (message instanceof SetLocalRouter) {
+ receiveSetLocalRouter((SetLocalRouter) message);
+ } else if (message instanceof AddOrUpdateRoutes) {
+ receiveAddRoutes((AddOrUpdateRoutes) message);
+ } else if (message instanceof RemoveRoutes) {
+ receiveRemoveRoutes((RemoveRoutes) message);
+ } else if (message instanceof Messages.FindRouters) {
+ receiveGetRouter((FindRouters) message);
+ } else {
+ super.handleReceive(message);
+ }
+ }
- getSender().tell(routedRpcReply, self());
- }
+ /**
+ * Register's rpc broker
+ *
+ * @param message contains {@link akka.actor.ActorRef} for rpc broker
+ */
+ private void receiveSetLocalRouter(SetLocalRouter message) {
+ getLocalBucket().getData().setRouter(message.getRouter());
+ }
- private void getRpc(GetRpc rpcMsg) {
- LOG.debug("Get global Rpc location from routing table {}", rpcMsg);
- String remoteActorPath = routingTable.getGlobalRoute(rpcMsg.getRouteId());
- GetRpcReply rpcReply = new GetRpcReply(remoteActorPath);
+ /**
+ * @param msg
+ */
+ private void receiveAddRoutes(AddOrUpdateRoutes msg) {
- getSender().tell(rpcReply, self());
- }
+ log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
- private void addRpc(AddRpc rpcMsg) {
- LOG.debug("Add Rpc to routing table {}", rpcMsg);
- routingTable.addGlobalRoute(rpcMsg.getRouteId(), rpcMsg.getActorPath());
+ RoutingTable table = getLocalBucket().getData().copy();
+ for(RpcRouter.RouteIdentifier, ?, ?> routeId : msg.getRouteIdentifiers()) {
+ table.addRoute(routeId);
+ }
- getSender().tell("Success", self());
- }
+ updateLocalBucket(table);
+ }
- private void removeRpc(RemoveRpc rpcMsg) {
- LOG.debug("Removing Rpc to routing table {}", rpcMsg);
- routingTable.removeGlobalRoute(rpcMsg.getRouteId());
+ /**
+ * @param msg contains list of route ids to remove
+ */
+ private void receiveRemoveRoutes(RemoveRoutes msg) {
- getSender().tell("Success", self());
- }
+ RoutingTable table = getLocalBucket().getData().copy();
+ for (RpcRouter.RouteIdentifier, ?, ?> routeId : msg.getRouteIdentifiers()) {
+ table.removeRoute(routeId);
+ }
- private void addRoutedRpc(AddRoutedRpc rpcMsg) {
- routingTable.addRoutedRpcs(rpcMsg.getAnnouncements(), rpcMsg.getActorPath());
- getSender().tell("Success", self());
- }
+ updateLocalBucket(table);
+ }
- private void removeRoutedRpc(RemoveRoutedRpc rpcMsg) {
- routingTable.removeRoutes(rpcMsg.getAnnouncements(), rpcMsg.getActorPath());
- getSender().tell("Success", self());
- }
+ /**
+ * Finds routers for the given rpc.
+ *
+ * @param msg
+ */
+ private void receiveGetRouter(FindRouters msg) {
+ List> routers = new ArrayList<>();
- private void syncRoutingTable(RoutingTableData routingTableData) {
- LOG.debug("Syncing routing table {}", routingTableData);
+ RouteIdentifier, ?, ?> routeId = msg.getRouteIdentifier();
+ findRoutes(getLocalBucket().getData(), routeId, routers);
- Map, String> newRpcMap = routingTableData.getRpcMap();
- Set> routeIds = newRpcMap.keySet();
- for(RpcRouter.RouteIdentifier, ?, ?> routeId : routeIds) {
- routingTable.addGlobalRoute(routeId, newRpcMap.get(routeId));
+ for(Bucket bucket : getRemoteBuckets().values()) {
+ findRoutes(bucket.getData(), routeId, routers);
+ }
+
+ getSender().tell(new Messages.FindRoutersReply(routers), getSelf());
}
- Map, LinkedHashSet> newRoutedRpcMap =
- routingTableData.getRoutedRpcMap();
- routeIds = newRoutedRpcMap.keySet();
+ private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier, ?, ?> routeId,
+ List> routers) {
+ if (table == null) {
+ return;
+ }
- for(RpcRouter.RouteIdentifier, ?, ?> routeId : routeIds) {
- Set routeAddresses = newRoutedRpcMap.get(routeId);
- for(String routeAddress : routeAddresses) {
- routingTable.addRoutedRpc(routeId, routeAddress);
- }
+ Option> routerWithUpdateTime = table.getRouterFor(routeId);
+ if(!routerWithUpdateTime.isEmpty()) {
+ routers.add(routerWithUpdateTime.get());
+ }
}
- }
-
- private ActorSelection getRandomRegistryActor() {
- ClusterEvent.CurrentClusterState clusterState = clusterWrapper.getState();
- ActorSelection actor = null;
- Set members = JavaConversions.asJavaSet(clusterState.members());
- int memberSize = members.size();
- // Don't select yourself
- if(memberSize > 1) {
- Address currentNodeAddress = clusterWrapper.getAddress();
- int index = new Random().nextInt(memberSize);
- int i = 0;
- // keeping previous member, in case when random index member is same as current actor
- // and current actor member is last in set
- Member previousMember = null;
- for(Member member : members){
- if(i == index-1) {
- previousMember = member;
+
+ /**
+ * All messages used by the RpcRegistry
+ */
+ public static class Messages {
+
+
+ public static class ContainsRoute {
+ final List> routeIdentifiers;
+
+ public ContainsRoute(List> routeIdentifiers) {
+ Preconditions.checkArgument(routeIdentifiers != null &&
+ !routeIdentifiers.isEmpty(),
+ "Route Identifiers must be supplied");
+ this.routeIdentifiers = routeIdentifiers;
+ }
+
+ public List> getRouteIdentifiers() {
+ return this.routeIdentifiers;
+ }
+
+ @Override
+ public String toString() {
+ return "ContainsRoute{" +
+ "routeIdentifiers=" + routeIdentifiers +
+ '}';
+ }
}
- if(i == index) {
- if(!currentNodeAddress.equals(member.address())) {
- actor = this.context().actorSelection(member.address() + "/user/rpc-registry");
- break;
- } else if(index < memberSize-1){ // pick the next element in the set
- index++;
- }
+
+ public static class AddOrUpdateRoutes extends ContainsRoute {
+
+ public AddOrUpdateRoutes(List> routeIdentifiers) {
+ super(routeIdentifiers);
+ }
}
- i++;
- }
- if(actor == null && previousMember != null) {
- actor = this.context().actorSelection(previousMember.address() + "/user/rpc-registry");
- }
- }
- return actor;
- }
- private class SendRoutingTable implements Runnable {
+ public static class RemoveRoutes extends ContainsRoute {
- @Override
- public void run() {
- RoutingTableData routingTableData =
- new RoutingTableData(routingTable.getGlobalRpcMap(), routingTable.getRoutedRpcMap());
- LOG.debug("Sending routing table for sync {}", routingTableData);
- ActorSelection actor = getRandomRegistryActor();
- if(actor != null) {
- actor.tell(routingTableData, self());
- }
+ public RemoveRoutes(List> routeIdentifiers) {
+ super(routeIdentifiers);
+ }
+ }
+
+ public static class SetLocalRouter {
+ private final ActorRef router;
+
+ public SetLocalRouter(ActorRef router) {
+ Preconditions.checkArgument(router != null, "Router must not be null");
+ this.router = router;
+ }
+
+ public ActorRef getRouter() {
+ return this.router;
+ }
+
+ @Override
+ public String toString() {
+ return "SetLocalRouter{" +
+ "router=" + router +
+ '}';
+ }
+ }
+
+ public static class FindRouters {
+ private final RpcRouter.RouteIdentifier, ?, ?> routeIdentifier;
+
+ public FindRouters(RpcRouter.RouteIdentifier, ?, ?> routeIdentifier) {
+ Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
+ this.routeIdentifier = routeIdentifier;
+ }
+
+ public RpcRouter.RouteIdentifier, ?, ?> getRouteIdentifier() {
+ return routeIdentifier;
+ }
+
+ @Override
+ public String toString() {
+ return "FindRouters{" +
+ "routeIdentifier=" + routeIdentifier +
+ '}';
+ }
+ }
+
+ public static class FindRoutersReply {
+ final List> routerWithUpdateTime;
+
+ public FindRoutersReply(List> routerWithUpdateTime) {
+ Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
+ this.routerWithUpdateTime = routerWithUpdateTime;
+ }
+
+ public List> getRouterWithUpdateTime() {
+ return routerWithUpdateTime;
+ }
+
+ @Override
+ public String toString() {
+ return "FindRoutersReply{" +
+ "routerWithUpdateTime=" + routerWithUpdateTime +
+ '}';
+ }
+ }
}
- }
}