Bug 4866: Add wait/retries for routed RPCs 75/34175/4
authorTom Pantelis <tpanteli@brocade.com>
Fri, 5 Feb 2016 07:42:54 +0000 (02:42 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Tue, 23 Feb 2016 22:01:08 +0000 (17:01 -0500)
If a routed RPC is registered on one node it takes a little time for the
route to propagate via gossip to other nodes. If another node tries to
invoke the RPC prior to propagation it fails. To alleviate this timing
issue, I added wait/retries via a timer in the RpcRegistry for the
FindRouters message. As routes are updated via gossip, it retries the
FindRouters request. If the timer triggers, it sends back an empty list.
The timer period is 10 times the gossip tick interval (500ms * 10 = 5s).

Change-Id: Iaafcfb4c93cde44f62f6645c8b8684102ac0d0db
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfig.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/application.conf
opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/simplelogger.properties [new file with mode: 0644]

index 6f3a10e..995b1d5 100644 (file)
@@ -103,6 +103,12 @@ public class RemoteRpcProviderConfig extends CommonConfig {
 
         }
 
+        public Builder gossipTickInterval(String interval) {
+            configHolder.put(TAG_GOSSIP_TICK_INTERVAL, interval);
+            return this;
+        }
+
+        @Override
         public RemoteRpcProviderConfig build(){
             return new RemoteRpcProviderConfig(merge());
         }
index c2ff045..b467ce9 100644 (file)
@@ -8,13 +8,18 @@
 package org.opendaylight.controller.remote.rpc.registry;
 
 import akka.actor.ActorRef;
+import akka.actor.Cancellable;
 import akka.actor.Props;
 import akka.japi.Creator;
 import akka.japi.Option;
 import akka.japi.Pair;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
@@ -26,6 +31,7 @@ import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryM
 import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * Registry to look up cluster nodes that have registered for a given rpc.
@@ -34,10 +40,13 @@ import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
  * cluster wide information.
  */
 public class RpcRegistry extends BucketStore<RoutingTable> {
+    private final Set<Runnable> routesUpdatedCallbacks = new HashSet<>();
+    private final FiniteDuration findRouterTimeout;
 
     public RpcRegistry(RemoteRpcProviderConfig config) {
         super(config);
         getLocalBucket().setData(new RoutingTable());
+        findRouterTimeout = getConfig().getGossipTickInterval().$times(10);
     }
 
     public static Props props(RemoteRpcProviderConfig config) {
@@ -56,6 +65,8 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
             receiveRemoveRoutes((RemoveRoutes) message);
         } else if (message instanceof Messages.FindRouters) {
             receiveGetRouter((FindRouters) message);
+        } else if (message instanceof Runnable) {
+            ((Runnable)message).run();
         } else {
             super.handleReceive(message);
         }
@@ -83,6 +94,8 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
         }
 
         updateLocalBucket(table);
+
+        onBucketsUpdated();
     }
 
     /**
@@ -101,19 +114,63 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
     /**
      * Finds routers for the given rpc.
      *
-     * @param msg
+     * @param findRouters
      */
-    private void receiveGetRouter(FindRouters msg) {
+    private void receiveGetRouter(final FindRouters findRouters) {
+        log.debug("receiveGetRouter for {}", findRouters.getRouteIdentifier());
+
+        final ActorRef sender = getSender();
+        if(!findRouters(findRouters, sender)) {
+            log.debug("No routers found for {} - scheduling {} ms timer", findRouters.getRouteIdentifier(),
+                    findRouterTimeout.toMillis());
+
+            final AtomicReference<Cancellable> timer = new AtomicReference<>();
+            final Runnable routesUpdatedRunnable = new Runnable() {
+                @Override
+                public void run() {
+                    if(findRouters(findRouters, sender)) {
+                        routesUpdatedCallbacks.remove(this);
+                        timer.get().cancel();
+                    }
+                }
+            };
+
+            routesUpdatedCallbacks.add(routesUpdatedRunnable);
+
+            Runnable timerRunnable = new Runnable() {
+                @Override
+                public void run() {
+                    log.warn("Timed out finding routers for {}", findRouters.getRouteIdentifier());
+
+                    routesUpdatedCallbacks.remove(routesUpdatedRunnable);
+                    sender.tell(new Messages.FindRoutersReply(
+                            Collections.<Pair<ActorRef, Long>>emptyList()), self());
+                }
+            };
+
+            timer.set(getContext().system().scheduler().scheduleOnce(findRouterTimeout, self(), timerRunnable,
+                    getContext().dispatcher(), self()));
+        }
+    }
+
+    private boolean findRouters(FindRouters findRouters, ActorRef sender) {
         List<Pair<ActorRef, Long>> routers = new ArrayList<>();
 
-        RouteIdentifier<?, ?, ?> routeId = msg.getRouteIdentifier();
+        RouteIdentifier<?, ?, ?> routeId = findRouters.getRouteIdentifier();
         findRoutes(getLocalBucket().getData(), routeId, routers);
 
         for(Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
             findRoutes(bucket.getData(), routeId, routers);
         }
 
-        getSender().tell(new Messages.FindRoutersReply(routers), getSelf());
+        log.debug("Found {} routers for {}", routers.size(), findRouters.getRouteIdentifier());
+
+        boolean foundRouters = !routers.isEmpty();
+        if(foundRouters) {
+            sender.tell(new Messages.FindRoutersReply(routers), getSelf());
+        }
+
+        return foundRouters;
     }
 
     private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier<?, ?, ?> routeId,
@@ -128,6 +185,13 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
         }
     }
 
+    @Override
+    protected void onBucketsUpdated() {
+        for(Runnable callBack: routesUpdatedCallbacks) {
+            callBack.run();
+        }
+    }
+
     /**
      * All messages used by the RpcRegistry
      */
index 39235bd..cc24e68 100644 (file)
@@ -111,6 +111,10 @@ public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMe
         }
     }
 
+    protected RemoteRpcProviderConfig getConfig() {
+        return config;
+    }
+
     /**
      * Returns all the buckets the this node knows about, self owned + remote
      */
@@ -224,6 +228,11 @@ public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMe
         if(log.isDebugEnabled()) {
             log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
         }
+
+        onBucketsUpdated();
+    }
+
+    protected void onBucketsUpdated() {
     }
 
     public BucketImpl<T> getLocalBucket() {
index fd1e6f1..2c2fd46 100644 (file)
@@ -8,12 +8,20 @@
 
 package org.opendaylight.controller.remote.rpc.registry;
 
+import static org.junit.Assert.fail;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.Props;
+import akka.cluster.Cluster;
+import akka.cluster.ClusterEvent.CurrentClusterState;
+import akka.cluster.Member;
+import akka.cluster.MemberStatus;
+import akka.cluster.UniqueAddress;
 import akka.japi.Pair;
 import akka.testkit.JavaTestKit;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
@@ -24,6 +32,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -71,12 +80,36 @@ public class RpcRegistryTest {
             }
         };
 
-        RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").withConfigReader(reader).build();
-        RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").withConfigReader(reader).build();
-        RemoteRpcProviderConfig config3 = new RemoteRpcProviderConfig.Builder("memberC").withConfigReader(reader).build();
+        RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").gossipTickInterval("200ms").
+                withConfigReader(reader).build();
+        RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").gossipTickInterval("200ms")
+                .withConfigReader(reader).build();
+        RemoteRpcProviderConfig config3 = new RemoteRpcProviderConfig.Builder("memberC").gossipTickInterval("200ms")
+                .withConfigReader(reader).build();
         node1 = ActorSystem.create("opendaylight-rpc", config1.get());
         node2 = ActorSystem.create("opendaylight-rpc", config2.get());
         node3 = ActorSystem.create("opendaylight-rpc", config3.get());
+
+        waitForMembersUp(node1, Cluster.get(node2).selfUniqueAddress(), Cluster.get(node3).selfUniqueAddress());
+        waitForMembersUp(node2, Cluster.get(node1).selfUniqueAddress(), Cluster.get(node3).selfUniqueAddress());
+    }
+
+    static void waitForMembersUp(ActorSystem node, UniqueAddress... addresses) {
+        Set<UniqueAddress> otherMembersSet = Sets.newHashSet(addresses);
+        Stopwatch sw = Stopwatch.createStarted();
+        while(sw.elapsed(TimeUnit.SECONDS) <= 10) {
+            CurrentClusterState state = Cluster.get(node).state();
+            for(Member m: state.getMembers()) {
+                if(m.status() == MemberStatus.up() && otherMembersSet.remove(m.uniqueAddress()) &&
+                        otherMembersSet.isEmpty()) {
+                    return;
+                }
+            }
+
+            Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+        }
+
+        fail("Member(s) " + otherMembersSet + " are not Up");
     }
 
     @AfterClass
@@ -366,4 +399,41 @@ public class RpcRegistryTest {
         return routeIds;
     }
 
+    @Test
+    public void testFindRoutersNotPresentInitially() throws Exception {
+
+        final JavaTestKit mockBroker1 = new JavaTestKit(node1);
+        final JavaTestKit mockBroker2 = new JavaTestKit(node2);
+
+        registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
+        registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef());
+
+        List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = createRouteIds();
+
+        registry1.tell(new FindRouters(routeIds.get(0)), mockBroker1.getRef());
+
+        registry2.tell(new AddOrUpdateRoutes(routeIds), mockBroker2.getRef());
+
+        FindRoutersReply reply = mockBroker1.expectMsgClass(Duration.create(7, TimeUnit.SECONDS),
+                FindRoutersReply.class);
+        List<Pair<ActorRef, Long>> respList = reply.getRouterWithUpdateTime();
+        Assert.assertEquals("getRouterWithUpdateTime size", 1, respList.size());
+    }
+
+    @Test
+    public void testFindRoutersNonExistent() throws Exception {
+
+        final JavaTestKit mockBroker1 = new JavaTestKit(node1);
+
+        registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
+
+        List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = createRouteIds();
+
+        registry1.tell(new FindRouters(routeIds.get(0)), mockBroker1.getRef());
+
+        FindRoutersReply reply = mockBroker1.expectMsgClass(Duration.create(7, TimeUnit.SECONDS),
+                FindRoutersReply.class);
+        List<Pair<ActorRef, Long>> respList = reply.getRouterWithUpdateTime();
+        Assert.assertEquals("getRouterWithUpdateTime size", 0, respList.size());
+    }
 }
index 8e31081..f22b4ea 100644 (file)
@@ -56,7 +56,7 @@ memberA{
   }
   akka {
     loglevel = "INFO"
-    #loggers = ["akka.event.slf4j.Slf4jLogger"]
+    loggers = ["akka.event.slf4j.Slf4jLogger"]
     actor {
       provider = "akka.cluster.ClusterActorRefProvider"
       debug {
@@ -89,7 +89,7 @@ memberB{
   }
   akka {
     loglevel = "INFO"
-    #loggers = ["akka.event.slf4j.Slf4jLogger"]
+    loggers = ["akka.event.slf4j.Slf4jLogger"]
 
     actor {
       provider = "akka.cluster.ClusterActorRefProvider"
@@ -123,7 +123,7 @@ memberC{
   }
   akka {
     loglevel = "INFO"
-    #loggers = ["akka.event.slf4j.Slf4jLogger"]
+    loggers = ["akka.event.slf4j.Slf4jLogger"]
     actor {
       provider = "akka.cluster.ClusterActorRefProvider"
       debug {
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/simplelogger.properties b/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/simplelogger.properties
new file mode 100644 (file)
index 0000000..d51b344
--- /dev/null
@@ -0,0 +1,6 @@
+org.slf4j.simpleLogger.showDateTime=true
+org.slf4j.simpleLogger.dateTimeFormat=hh:mm:ss,S a
+org.slf4j.simpleLogger.logFile=System.out
+org.slf4j.simpleLogger.showShortLogName=true
+org.slf4j.simpleLogger.levelInBrackets=true
+org.slf4j.simpleLogger.log.org.opendaylight.controller.remote.rpc=debug