BUG-6937: Add ReachableMember case to Gossiper
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
index 845c1c819a70ca6bac0fb1a717b31f7861b6a6b6..fc5b618663988d304d6b51468d2da9f99250d7b2 100644 (file)
@@ -8,13 +8,18 @@
 package org.opendaylight.controller.remote.rpc.registry;
 
 import akka.actor.ActorRef;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
+import akka.actor.Cancellable;
+import akka.actor.Props;
 import akka.japi.Option;
 import akka.japi.Pair;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
@@ -23,19 +28,27 @@ import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * Registry to look up cluster nodes that have registered for a given rpc.
- * <p/>
+ *
+ * <p>
  * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
  * cluster wide information.
  */
 public class RpcRegistry extends BucketStore<RoutingTable> {
+    private final Set<Runnable> routesUpdatedCallbacks = new HashSet<>();
+    private final FiniteDuration findRouterTimeout;
 
-    final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
-
-    public RpcRegistry() {
+    public RpcRegistry(RemoteRpcProviderConfig config) {
+        super(config);
         getLocalBucket().setData(new RoutingTable());
+        findRouterTimeout = getConfig().getGossipTickInterval().$times(10);
+    }
+
+    public static Props props(RemoteRpcProviderConfig config) {
+        return Props.create(RpcRegistry.class, config);
     }
 
     @Override
@@ -50,13 +63,15 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
             receiveRemoveRoutes((RemoveRoutes) message);
         } else if (message instanceof Messages.FindRouters) {
             receiveGetRouter((FindRouters) message);
+        } else if (message instanceof Runnable) {
+            ((Runnable)message).run();
         } else {
             super.handleReceive(message);
         }
     }
 
     /**
-     * Register's rpc broker
+     * Registers a rpc broker.
      *
      * @param message contains {@link akka.actor.ActorRef} for rpc broker
      */
@@ -64,22 +79,23 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
         getLocalBucket().getData().setRouter(message.getRouter());
     }
 
-    /**
-     * @param msg
-     */
     private void receiveAddRoutes(AddOrUpdateRoutes msg) {
 
         log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
 
         RoutingTable table = getLocalBucket().getData().copy();
-        for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
+        for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
             table.addRoute(routeId);
         }
 
         updateLocalBucket(table);
+
+        onBucketsUpdated();
     }
 
     /**
+     * Processes a RemoveRoutes message.
+     *
      * @param msg contains list of route ids to remove
      */
     private void receiveRemoveRoutes(RemoveRoutes msg) {
@@ -95,19 +111,60 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
     /**
      * Finds routers for the given rpc.
      *
-     * @param msg
+     * @param findRouters the FindRouters request
      */
-    private void receiveGetRouter(FindRouters msg) {
+    private void receiveGetRouter(final FindRouters findRouters) {
+        log.debug("receiveGetRouter for {}", findRouters.getRouteIdentifier());
+
+        final ActorRef sender = getSender();
+        if (!findRouters(findRouters, sender)) {
+            log.debug("No routers found for {} - scheduling {} ms timer", findRouters.getRouteIdentifier(),
+                    findRouterTimeout.toMillis());
+
+            final AtomicReference<Cancellable> timer = new AtomicReference<>();
+            final Runnable routesUpdatedRunnable = new Runnable() {
+                @Override
+                public void run() {
+                    if (findRouters(findRouters, sender)) {
+                        routesUpdatedCallbacks.remove(this);
+                        timer.get().cancel();
+                    }
+                }
+            };
+
+            routesUpdatedCallbacks.add(routesUpdatedRunnable);
+
+            Runnable timerRunnable = () -> {
+                log.warn("Timed out finding routers for {}", findRouters.getRouteIdentifier());
+
+                routesUpdatedCallbacks.remove(routesUpdatedRunnable);
+                sender.tell(new Messages.FindRoutersReply(
+                        Collections.<Pair<ActorRef, Long>>emptyList()), self());
+            };
+
+            timer.set(getContext().system().scheduler().scheduleOnce(findRouterTimeout, self(), timerRunnable,
+                    getContext().dispatcher(), self()));
+        }
+    }
+
+    private boolean findRouters(FindRouters findRouters, ActorRef sender) {
         List<Pair<ActorRef, Long>> routers = new ArrayList<>();
 
-        RouteIdentifier<?, ?, ?> routeId = msg.getRouteIdentifier();
+        RouteIdentifier<?, ?, ?> routeId = findRouters.getRouteIdentifier();
         findRoutes(getLocalBucket().getData(), routeId, routers);
 
-        for(Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
+        for (Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
             findRoutes(bucket.getData(), routeId, routers);
         }
 
-        getSender().tell(new Messages.FindRoutersReply(routers), getSelf());
+        log.debug("Found {} routers for {}", routers.size(), findRouters.getRouteIdentifier());
+
+        boolean foundRouters = !routers.isEmpty();
+        if (foundRouters) {
+            sender.tell(new Messages.FindRoutersReply(routers), getSelf());
+        }
+
+        return foundRouters;
     }
 
     private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier<?, ?, ?> routeId,
@@ -117,13 +174,24 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
         }
 
         Option<Pair<ActorRef, Long>> routerWithUpdateTime = table.getRouterFor(routeId);
-        if(!routerWithUpdateTime.isEmpty()) {
+        if (!routerWithUpdateTime.isEmpty()) {
             routers.add(routerWithUpdateTime.get());
         }
     }
 
+    @Override
+    protected void onBucketsUpdated() {
+        if (routesUpdatedCallbacks.isEmpty()) {
+            return;
+        }
+
+        for (Runnable callBack: routesUpdatedCallbacks.toArray(new Runnable[routesUpdatedCallbacks.size()])) {
+            callBack.run();
+        }
+    }
+
     /**
-     * All messages used by the RpcRegistry
+     * All messages used by the RpcRegistry.
      */
     public static class Messages {
 
@@ -132,9 +200,8 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
             final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
 
             public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
-                Preconditions.checkArgument(routeIdentifiers != null &&
-                                            !routeIdentifiers.isEmpty(),
-                                            "Route Identifiers must be supplied");
+                Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(),
+                        "Route Identifiers must be supplied");
                 this.routeIdentifiers = routeIdentifiers;
             }
 
@@ -144,9 +211,7 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
 
             @Override
             public String toString() {
-                return "ContainsRoute{" +
-                        "routeIdentifiers=" + routeIdentifiers +
-                        '}';
+                return "ContainsRoute{" + "routeIdentifiers=" + routeIdentifiers + '}';
             }
         }
 
@@ -178,9 +243,7 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
 
             @Override
             public String toString() {
-                return "SetLocalRouter{" +
-                        "router=" + router +
-                        '}';
+                return "SetLocalRouter{" + "router=" + router + '}';
             }
         }
 
@@ -198,9 +261,7 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
 
             @Override
             public String toString() {
-                return "FindRouters{" +
-                        "routeIdentifier=" + routeIdentifier +
-                        '}';
+                return "FindRouters{" + "routeIdentifier=" + routeIdentifier + '}';
             }
         }
 
@@ -218,9 +279,7 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
 
             @Override
             public String toString() {
-                return "FindRoutersReply{" +
-                        "routerWithUpdateTime=" + routerWithUpdateTime +
-                        '}';
+                return "FindRoutersReply{" + "routerWithUpdateTime=" + routerWithUpdateTime + '}';
             }
         }
     }