Bug 4866: Add wait/retries for routed RPCs
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
index c2ff0456f7cd04b5da3289a70a4fdf29bce84bc3..b467ce949ae4a86e9500fffa9f05b85c0ba407e1 100644 (file)
@@ -8,13 +8,18 @@
 package org.opendaylight.controller.remote.rpc.registry;
 
 import akka.actor.ActorRef;
+import akka.actor.Cancellable;
 import akka.actor.Props;
 import akka.japi.Creator;
 import akka.japi.Option;
 import akka.japi.Pair;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
@@ -26,6 +31,7 @@ import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryM
 import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * Registry to look up cluster nodes that have registered for a given rpc.
@@ -34,10 +40,13 @@ import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
  * cluster wide information.
  */
 public class RpcRegistry extends BucketStore<RoutingTable> {
+    private final Set<Runnable> routesUpdatedCallbacks = new HashSet<>();
+    private final FiniteDuration findRouterTimeout;
 
     public RpcRegistry(RemoteRpcProviderConfig config) {
         super(config);
         getLocalBucket().setData(new RoutingTable());
+        findRouterTimeout = getConfig().getGossipTickInterval().$times(10);
     }
 
     public static Props props(RemoteRpcProviderConfig config) {
@@ -56,6 +65,8 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
             receiveRemoveRoutes((RemoveRoutes) message);
         } else if (message instanceof Messages.FindRouters) {
             receiveGetRouter((FindRouters) message);
+        } else if (message instanceof Runnable) {
+            ((Runnable)message).run();
         } else {
             super.handleReceive(message);
         }
@@ -83,6 +94,8 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
         }
 
         updateLocalBucket(table);
+
+        onBucketsUpdated();
     }
 
     /**
@@ -101,19 +114,63 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
     /**
      * Finds routers for the given rpc.
      *
-     * @param msg
+     * @param findRouters
      */
-    private void receiveGetRouter(FindRouters msg) {
+    private void receiveGetRouter(final FindRouters findRouters) {
+        log.debug("receiveGetRouter for {}", findRouters.getRouteIdentifier());
+
+        final ActorRef sender = getSender();
+        if(!findRouters(findRouters, sender)) {
+            log.debug("No routers found for {} - scheduling {} ms timer", findRouters.getRouteIdentifier(),
+                    findRouterTimeout.toMillis());
+
+            final AtomicReference<Cancellable> timer = new AtomicReference<>();
+            final Runnable routesUpdatedRunnable = new Runnable() {
+                @Override
+                public void run() {
+                    if(findRouters(findRouters, sender)) {
+                        routesUpdatedCallbacks.remove(this);
+                        timer.get().cancel();
+                    }
+                }
+            };
+
+            routesUpdatedCallbacks.add(routesUpdatedRunnable);
+
+            Runnable timerRunnable = new Runnable() {
+                @Override
+                public void run() {
+                    log.warn("Timed out finding routers for {}", findRouters.getRouteIdentifier());
+
+                    routesUpdatedCallbacks.remove(routesUpdatedRunnable);
+                    sender.tell(new Messages.FindRoutersReply(
+                            Collections.<Pair<ActorRef, Long>>emptyList()), self());
+                }
+            };
+
+            timer.set(getContext().system().scheduler().scheduleOnce(findRouterTimeout, self(), timerRunnable,
+                    getContext().dispatcher(), self()));
+        }
+    }
+
+    private boolean findRouters(FindRouters findRouters, ActorRef sender) {
         List<Pair<ActorRef, Long>> routers = new ArrayList<>();
 
-        RouteIdentifier<?, ?, ?> routeId = msg.getRouteIdentifier();
+        RouteIdentifier<?, ?, ?> routeId = findRouters.getRouteIdentifier();
         findRoutes(getLocalBucket().getData(), routeId, routers);
 
         for(Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
             findRoutes(bucket.getData(), routeId, routers);
         }
 
-        getSender().tell(new Messages.FindRoutersReply(routers), getSelf());
+        log.debug("Found {} routers for {}", routers.size(), findRouters.getRouteIdentifier());
+
+        boolean foundRouters = !routers.isEmpty();
+        if(foundRouters) {
+            sender.tell(new Messages.FindRoutersReply(routers), getSelf());
+        }
+
+        return foundRouters;
     }
 
     private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier<?, ?, ?> routeId,
@@ -128,6 +185,13 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
         }
     }
 
+    @Override
+    protected void onBucketsUpdated() {
+        for(Runnable callBack: routesUpdatedCallbacks) {
+            callBack.run();
+        }
+    }
+
     /**
      * All messages used by the RpcRegistry
      */