BUG-6937: Add ReachableMember case to Gossiper
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
index e36060cc13ece309f04f10adebdb74a62f158146..fc5b618663988d304d6b51468d2da9f99250d7b2 100644 (file)
  */
 package org.opendaylight.controller.remote.rpc.registry;
 
-import akka.actor.ActorSelection;
-import akka.actor.Address;
+import akka.actor.ActorRef;
+import akka.actor.Cancellable;
 import akka.actor.Props;
-import akka.cluster.ClusterEvent;
-import akka.cluster.Member;
-import akka.japi.Creator;
-import org.opendaylight.controller.remote.rpc.AbstractUntypedActor;
-import org.opendaylight.controller.remote.rpc.ActorConstants;
-import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc;
-import org.opendaylight.controller.remote.rpc.messages.AddRpc;
-import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc;
-import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply;
-import org.opendaylight.controller.remote.rpc.messages.GetRpc;
-import org.opendaylight.controller.remote.rpc.messages.GetRpcReply;
-import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc;
-import org.opendaylight.controller.remote.rpc.messages.RemoveRpc;
-import org.opendaylight.controller.remote.rpc.messages.RoutingTableData;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.collection.JavaConversions;
-
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Random;
+import akka.japi.Option;
+import akka.japi.Pair;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
- * This Actor maintains the routing table state and sync it with other nodes in the cluster.
- *
- * A scheduler runs after an interval of time, which pick a random member from the cluster
- * and send the current state of routing table to the member.
+ * Registry to look up cluster nodes that have registered for a given rpc.
  *
- * when a message of routing table data is received, it gets merged with the local routing table
- * to keep the latest data.
+ * <p>
+ * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
+ * cluster wide information.
  */
+public class RpcRegistry extends BucketStore<RoutingTable> {
+    private final Set<Runnable> routesUpdatedCallbacks = new HashSet<>();
+    private final FiniteDuration findRouterTimeout;
 
-public class RpcRegistry extends AbstractUntypedActor {
-
-  private static final Logger LOG = LoggerFactory.getLogger(RpcRegistry.class);
-  private RoutingTable<RpcRouter.RouteIdentifier<?, ?, ?>, String> routingTable;
-  private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
-  private final ClusterWrapper clusterWrapper;
-  private final ScheduledFuture<?> syncScheduler;
-
-  private RpcRegistry(ClusterWrapper clusterWrapper){
-    this.routingTable = new RoutingTable<>();
-    this.clusterWrapper = clusterWrapper;
-    this.syncScheduler = scheduler.scheduleAtFixedRate(new SendRoutingTable(), 10, 10, TimeUnit.SECONDS);
-  }
-
-  public static Props props(final ClusterWrapper clusterWrapper){
-    return Props.create(new Creator<RpcRegistry>(){
-
-      @Override
-      public RpcRegistry create() throws Exception {
-        return new RpcRegistry(clusterWrapper);
-      }
-    });
-  }
-
-  @Override
-  protected void handleReceive(Object message) throws Exception {
-    LOG.debug("Received message {}", message);
-    if(message instanceof RoutingTableData) {
-      syncRoutingTable((RoutingTableData) message);
-    } else if(message instanceof GetRoutedRpc) {
-      getRoutedRpc((GetRoutedRpc) message);
-    } else if(message instanceof GetRpc) {
-      getRpc((GetRpc) message);
-    } else if(message instanceof AddRpc) {
-      addRpc((AddRpc) message);
-    } else if(message instanceof RemoveRpc) {
-      removeRpc((RemoveRpc) message);
-    } else if(message instanceof AddRoutedRpc) {
-      addRoutedRpc((AddRoutedRpc) message);
-    } else if(message instanceof RemoveRoutedRpc) {
-      removeRoutedRpc((RemoveRoutedRpc) message);
+    public RpcRegistry(RemoteRpcProviderConfig config) {
+        super(config);
+        getLocalBucket().setData(new RoutingTable());
+        findRouterTimeout = getConfig().getGossipTickInterval().$times(10);
     }
-  }
 
-  private void getRoutedRpc(GetRoutedRpc rpcMsg){
-    LOG.debug("Get latest routed Rpc location from routing table {}", rpcMsg);
-    String remoteActorPath = routingTable.getLastAddedRoutedRpc(rpcMsg.getRouteId());
-    GetRoutedRpcReply routedRpcReply = new GetRoutedRpcReply(remoteActorPath);
+    public static Props props(RemoteRpcProviderConfig config) {
+        return Props.create(RpcRegistry.class, config);
+    }
 
-    getSender().tell(routedRpcReply, self());
-  }
+    @Override
+    protected void handleReceive(Object message) throws Exception {
+        //TODO: if sender is remote, reject message
 
-  private void getRpc(GetRpc rpcMsg) {
-    LOG.debug("Get global Rpc location from routing table {}", rpcMsg);
-    String remoteActorPath = routingTable.getGlobalRoute(rpcMsg.getRouteId());
-    GetRpcReply rpcReply = new GetRpcReply(remoteActorPath);
+        if (message instanceof SetLocalRouter) {
+            receiveSetLocalRouter((SetLocalRouter) message);
+        } else if (message instanceof AddOrUpdateRoutes) {
+            receiveAddRoutes((AddOrUpdateRoutes) message);
+        } else if (message instanceof RemoveRoutes) {
+            receiveRemoveRoutes((RemoveRoutes) message);
+        } else if (message instanceof Messages.FindRouters) {
+            receiveGetRouter((FindRouters) message);
+        } else if (message instanceof Runnable) {
+            ((Runnable)message).run();
+        } else {
+            super.handleReceive(message);
+        }
+    }
 
-    getSender().tell(rpcReply, self());
-  }
+    /**
+     * Registers a rpc broker.
+     *
+     * @param message contains {@link akka.actor.ActorRef} for rpc broker
+     */
+    private void receiveSetLocalRouter(SetLocalRouter message) {
+        getLocalBucket().getData().setRouter(message.getRouter());
+    }
 
-  private void addRpc(AddRpc rpcMsg) {
-    LOG.debug("Add Rpc to routing table {}", rpcMsg);
-    routingTable.addGlobalRoute(rpcMsg.getRouteId(), rpcMsg.getActorPath());
+    private void receiveAddRoutes(AddOrUpdateRoutes msg) {
 
-    getSender().tell("Success", self());
-  }
+        log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
 
-  private void removeRpc(RemoveRpc rpcMsg) {
-    LOG.debug("Removing Rpc to routing table {}", rpcMsg);
-    routingTable.removeGlobalRoute(rpcMsg.getRouteId());
+        RoutingTable table = getLocalBucket().getData().copy();
+        for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
+            table.addRoute(routeId);
+        }
 
-    getSender().tell("Success", self());
-  }
+        updateLocalBucket(table);
 
-  private void addRoutedRpc(AddRoutedRpc rpcMsg) {
-    routingTable.addRoutedRpcs(rpcMsg.getAnnouncements(), rpcMsg.getActorPath());
-    getSender().tell("Success", self());
-  }
+        onBucketsUpdated();
+    }
 
-  private void removeRoutedRpc(RemoveRoutedRpc rpcMsg) {
-    routingTable.removeRoutes(rpcMsg.getAnnouncements(), rpcMsg.getActorPath());
-    getSender().tell("Success", self());
-  }
+    /**
+     * Processes a RemoveRoutes message.
+     *
+     * @param msg contains list of route ids to remove
+     */
+    private void receiveRemoveRoutes(RemoveRoutes msg) {
 
-  private void syncRoutingTable(RoutingTableData routingTableData) {
-    LOG.debug("Syncing routing table {}", routingTableData);
+        RoutingTable table = getLocalBucket().getData().copy();
+        for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
+            table.removeRoute(routeId);
+        }
 
-    Map<RpcRouter.RouteIdentifier<?, ?, ?>, String> newRpcMap = routingTableData.getRpcMap();
-    Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = newRpcMap.keySet();
-    for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
-      routingTable.addGlobalRoute(routeId, newRpcMap.get(routeId));
+        updateLocalBucket(table);
     }
 
-    Map<RpcRouter.RouteIdentifier<?, ?, ?>, LinkedHashSet<String>> newRoutedRpcMap =
-        routingTableData.getRoutedRpcMap();
-    routeIds = newRoutedRpcMap.keySet();
+    /**
+     * Finds routers for the given rpc.
+     *
+     * @param findRouters the FindRouters request
+     */
+    private void receiveGetRouter(final FindRouters findRouters) {
+        log.debug("receiveGetRouter for {}", findRouters.getRouteIdentifier());
+
+        final ActorRef sender = getSender();
+        if (!findRouters(findRouters, sender)) {
+            log.debug("No routers found for {} - scheduling {} ms timer", findRouters.getRouteIdentifier(),
+                    findRouterTimeout.toMillis());
+
+            final AtomicReference<Cancellable> timer = new AtomicReference<>();
+            final Runnable routesUpdatedRunnable = new Runnable() {
+                @Override
+                public void run() {
+                    if (findRouters(findRouters, sender)) {
+                        routesUpdatedCallbacks.remove(this);
+                        timer.get().cancel();
+                    }
+                }
+            };
+
+            routesUpdatedCallbacks.add(routesUpdatedRunnable);
+
+            Runnable timerRunnable = () -> {
+                log.warn("Timed out finding routers for {}", findRouters.getRouteIdentifier());
 
-    for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
-      Set<String> routeAddresses = newRoutedRpcMap.get(routeId);
-      for(String routeAddress : routeAddresses) {
-        routingTable.addRoutedRpc(routeId, routeAddress);
-      }
+                routesUpdatedCallbacks.remove(routesUpdatedRunnable);
+                sender.tell(new Messages.FindRoutersReply(
+                        Collections.<Pair<ActorRef, Long>>emptyList()), self());
+            };
+
+            timer.set(getContext().system().scheduler().scheduleOnce(findRouterTimeout, self(), timerRunnable,
+                    getContext().dispatcher(), self()));
+        }
     }
-  }
-
-  private ActorSelection getRandomRegistryActor() {
-    ClusterEvent.CurrentClusterState clusterState = clusterWrapper.getState();
-    ActorSelection actor = null;
-    Set<Member> members = JavaConversions.asJavaSet(clusterState.members());
-    int memberSize = members.size();
-    // Don't select yourself
-    if(memberSize > 1) {
-      Address currentNodeAddress = clusterWrapper.getAddress();
-      int index = new Random().nextInt(memberSize);
-      int i = 0;
-      // keeping previous member, in case when random index member is same as current actor
-      // and current actor member is last in set
-      Member previousMember = null;
-      for(Member member : members){
-        if(i == index-1) {
-          previousMember = member;
-        }
-        if(i == index) {
-          if(!currentNodeAddress.equals(member.address())) {
-            actor = this.context().actorSelection(member.address() + ActorConstants.RPC_REGISTRY_PATH);
-            break;
-          } else if(index < memberSize-1){ // pick the next element in the set
-            index++;
-          }
-        }
-        i++;
-      }
-      if(actor == null && previousMember != null) {
-        actor = this.context().actorSelection(previousMember.address() + ActorConstants.RPC_REGISTRY_PATH);
-      }
+
+    private boolean findRouters(FindRouters findRouters, ActorRef sender) {
+        List<Pair<ActorRef, Long>> routers = new ArrayList<>();
+
+        RouteIdentifier<?, ?, ?> routeId = findRouters.getRouteIdentifier();
+        findRoutes(getLocalBucket().getData(), routeId, routers);
+
+        for (Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
+            findRoutes(bucket.getData(), routeId, routers);
+        }
+
+        log.debug("Found {} routers for {}", routers.size(), findRouters.getRouteIdentifier());
+
+        boolean foundRouters = !routers.isEmpty();
+        if (foundRouters) {
+            sender.tell(new Messages.FindRoutersReply(routers), getSelf());
+        }
+
+        return foundRouters;
     }
-    return actor;
-  }
 
-  private class SendRoutingTable implements Runnable {
+    private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier<?, ?, ?> routeId,
+            List<Pair<ActorRef, Long>> routers) {
+        if (table == null) {
+            return;
+        }
+
+        Option<Pair<ActorRef, Long>> routerWithUpdateTime = table.getRouterFor(routeId);
+        if (!routerWithUpdateTime.isEmpty()) {
+            routers.add(routerWithUpdateTime.get());
+        }
+    }
 
     @Override
-    public void run() {
-      RoutingTableData routingTableData =
-          new RoutingTableData(routingTable.getGlobalRpcMap(), routingTable.getRoutedRpcMap());
-      LOG.debug("Sending routing table for sync {}", routingTableData);
-      ActorSelection actor = getRandomRegistryActor();
-      if(actor != null) {
-        actor.tell(routingTableData, self());
-      }
+    protected void onBucketsUpdated() {
+        if (routesUpdatedCallbacks.isEmpty()) {
+            return;
+        }
+
+        for (Runnable callBack: routesUpdatedCallbacks.toArray(new Runnable[routesUpdatedCallbacks.size()])) {
+            callBack.run();
+        }
+    }
+
+    /**
+     * All messages used by the RpcRegistry.
+     */
+    public static class Messages {
+
+
+        public static class ContainsRoute {
+            final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
+
+            public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+                Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(),
+                        "Route Identifiers must be supplied");
+                this.routeIdentifiers = routeIdentifiers;
+            }
+
+            public List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
+                return this.routeIdentifiers;
+            }
+
+            @Override
+            public String toString() {
+                return "ContainsRoute{" + "routeIdentifiers=" + routeIdentifiers + '}';
+            }
+        }
+
+        public static class AddOrUpdateRoutes extends ContainsRoute {
+
+            public AddOrUpdateRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+                super(routeIdentifiers);
+            }
+        }
+
+        public static class RemoveRoutes extends ContainsRoute {
+
+            public RemoveRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+                super(routeIdentifiers);
+            }
+        }
+
+        public static class SetLocalRouter {
+            private final ActorRef router;
+
+            public SetLocalRouter(ActorRef router) {
+                Preconditions.checkArgument(router != null, "Router must not be null");
+                this.router = router;
+            }
+
+            public ActorRef getRouter() {
+                return this.router;
+            }
+
+            @Override
+            public String toString() {
+                return "SetLocalRouter{" + "router=" + router + '}';
+            }
+        }
+
+        public static class FindRouters {
+            private final RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier;
+
+            public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
+                Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
+                this.routeIdentifier = routeIdentifier;
+            }
+
+            public RpcRouter.RouteIdentifier<?, ?, ?> getRouteIdentifier() {
+                return routeIdentifier;
+            }
+
+            @Override
+            public String toString() {
+                return "FindRouters{" + "routeIdentifier=" + routeIdentifier + '}';
+            }
+        }
+
+        public static class FindRoutersReply {
+            final List<Pair<ActorRef, Long>> routerWithUpdateTime;
+
+            public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
+                Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
+                this.routerWithUpdateTime = routerWithUpdateTime;
+            }
+
+            public List<Pair<ActorRef, Long>> getRouterWithUpdateTime() {
+                return routerWithUpdateTime;
+            }
+
+            @Override
+            public String toString() {
+                return "FindRoutersReply{" + "routerWithUpdateTime=" + routerWithUpdateTime + '}';
+            }
+        }
     }
-  }
 }