Bug 4866: Add wait/retries for routed RPCs 75/34175/4
authorTom Pantelis <tpanteli@brocade.com>
Fri, 5 Feb 2016 07:42:54 +0000 (02:42 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Tue, 23 Feb 2016 22:01:08 +0000 (17:01 -0500)
If a routed RPC is registered on one node it takes a little time for the
route to propagate via gossip to other nodes. If another node tries to
invoke the RPC prior to propagation it fails. To alleviate this timing
issue, I added wait/retries via a timer in the RpcRegistry for the
FindRouters message. As routes are updated via gossip, it retries the
FindRouters request. If the timer triggers, it sends back an empty list.
The timer period is 10 times the gossip tick interval (500ms * 10 = 5s).

Change-Id: Iaafcfb4c93cde44f62f6645c8b8684102ac0d0db
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfig.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/application.conf
opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/simplelogger.properties [new file with mode: 0644]

index 6f3a10e6484cb115b1028dc83fa4f6c2063c1883..995b1d5172553f234e6602a11ccb236c19bf0d50 100644 (file)
@@ -103,6 +103,12 @@ public class RemoteRpcProviderConfig extends CommonConfig {
 
         }
 
 
         }
 
+        public Builder gossipTickInterval(String interval) {
+            configHolder.put(TAG_GOSSIP_TICK_INTERVAL, interval);
+            return this;
+        }
+
+        @Override
         public RemoteRpcProviderConfig build(){
             return new RemoteRpcProviderConfig(merge());
         }
         public RemoteRpcProviderConfig build(){
             return new RemoteRpcProviderConfig(merge());
         }
index c2ff0456f7cd04b5da3289a70a4fdf29bce84bc3..b467ce949ae4a86e9500fffa9f05b85c0ba407e1 100644 (file)
@@ -8,13 +8,18 @@
 package org.opendaylight.controller.remote.rpc.registry;
 
 import akka.actor.ActorRef;
 package org.opendaylight.controller.remote.rpc.registry;
 
 import akka.actor.ActorRef;
+import akka.actor.Cancellable;
 import akka.actor.Props;
 import akka.japi.Creator;
 import akka.japi.Option;
 import akka.japi.Pair;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import akka.actor.Props;
 import akka.japi.Creator;
 import akka.japi.Option;
 import akka.japi.Pair;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
@@ -26,6 +31,7 @@ import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryM
 import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
 import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * Registry to look up cluster nodes that have registered for a given rpc.
 
 /**
  * Registry to look up cluster nodes that have registered for a given rpc.
@@ -34,10 +40,13 @@ import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
  * cluster wide information.
  */
 public class RpcRegistry extends BucketStore<RoutingTable> {
  * cluster wide information.
  */
 public class RpcRegistry extends BucketStore<RoutingTable> {
+    private final Set<Runnable> routesUpdatedCallbacks = new HashSet<>();
+    private final FiniteDuration findRouterTimeout;
 
     public RpcRegistry(RemoteRpcProviderConfig config) {
         super(config);
         getLocalBucket().setData(new RoutingTable());
 
     public RpcRegistry(RemoteRpcProviderConfig config) {
         super(config);
         getLocalBucket().setData(new RoutingTable());
+        findRouterTimeout = getConfig().getGossipTickInterval().$times(10);
     }
 
     public static Props props(RemoteRpcProviderConfig config) {
     }
 
     public static Props props(RemoteRpcProviderConfig config) {
@@ -56,6 +65,8 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
             receiveRemoveRoutes((RemoveRoutes) message);
         } else if (message instanceof Messages.FindRouters) {
             receiveGetRouter((FindRouters) message);
             receiveRemoveRoutes((RemoveRoutes) message);
         } else if (message instanceof Messages.FindRouters) {
             receiveGetRouter((FindRouters) message);
+        } else if (message instanceof Runnable) {
+            ((Runnable)message).run();
         } else {
             super.handleReceive(message);
         }
         } else {
             super.handleReceive(message);
         }
@@ -83,6 +94,8 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
         }
 
         updateLocalBucket(table);
         }
 
         updateLocalBucket(table);
+
+        onBucketsUpdated();
     }
 
     /**
     }
 
     /**
@@ -101,19 +114,63 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
     /**
      * Finds routers for the given rpc.
      *
     /**
      * Finds routers for the given rpc.
      *
-     * @param msg
+     * @param findRouters
      */
      */
-    private void receiveGetRouter(FindRouters msg) {
+    private void receiveGetRouter(final FindRouters findRouters) {
+        log.debug("receiveGetRouter for {}", findRouters.getRouteIdentifier());
+
+        final ActorRef sender = getSender();
+        if(!findRouters(findRouters, sender)) {
+            log.debug("No routers found for {} - scheduling {} ms timer", findRouters.getRouteIdentifier(),
+                    findRouterTimeout.toMillis());
+
+            final AtomicReference<Cancellable> timer = new AtomicReference<>();
+            final Runnable routesUpdatedRunnable = new Runnable() {
+                @Override
+                public void run() {
+                    if(findRouters(findRouters, sender)) {
+                        routesUpdatedCallbacks.remove(this);
+                        timer.get().cancel();
+                    }
+                }
+            };
+
+            routesUpdatedCallbacks.add(routesUpdatedRunnable);
+
+            Runnable timerRunnable = new Runnable() {
+                @Override
+                public void run() {
+                    log.warn("Timed out finding routers for {}", findRouters.getRouteIdentifier());
+
+                    routesUpdatedCallbacks.remove(routesUpdatedRunnable);
+                    sender.tell(new Messages.FindRoutersReply(
+                            Collections.<Pair<ActorRef, Long>>emptyList()), self());
+                }
+            };
+
+            timer.set(getContext().system().scheduler().scheduleOnce(findRouterTimeout, self(), timerRunnable,
+                    getContext().dispatcher(), self()));
+        }
+    }
+
+    private boolean findRouters(FindRouters findRouters, ActorRef sender) {
         List<Pair<ActorRef, Long>> routers = new ArrayList<>();
 
         List<Pair<ActorRef, Long>> routers = new ArrayList<>();
 
-        RouteIdentifier<?, ?, ?> routeId = msg.getRouteIdentifier();
+        RouteIdentifier<?, ?, ?> routeId = findRouters.getRouteIdentifier();
         findRoutes(getLocalBucket().getData(), routeId, routers);
 
         for(Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
             findRoutes(bucket.getData(), routeId, routers);
         }
 
         findRoutes(getLocalBucket().getData(), routeId, routers);
 
         for(Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
             findRoutes(bucket.getData(), routeId, routers);
         }
 
-        getSender().tell(new Messages.FindRoutersReply(routers), getSelf());
+        log.debug("Found {} routers for {}", routers.size(), findRouters.getRouteIdentifier());
+
+        boolean foundRouters = !routers.isEmpty();
+        if(foundRouters) {
+            sender.tell(new Messages.FindRoutersReply(routers), getSelf());
+        }
+
+        return foundRouters;
     }
 
     private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier<?, ?, ?> routeId,
     }
 
     private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier<?, ?, ?> routeId,
@@ -128,6 +185,13 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
         }
     }
 
         }
     }
 
+    @Override
+    protected void onBucketsUpdated() {
+        for(Runnable callBack: routesUpdatedCallbacks) {
+            callBack.run();
+        }
+    }
+
     /**
      * All messages used by the RpcRegistry
      */
     /**
      * All messages used by the RpcRegistry
      */
index 39235bd97834786c02b250393d9e16f10b8a2064..cc24e6845f852df17b52e604cc113f7e34a4a6d8 100644 (file)
@@ -111,6 +111,10 @@ public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMe
         }
     }
 
         }
     }
 
+    protected RemoteRpcProviderConfig getConfig() {
+        return config;
+    }
+
     /**
      * Returns all the buckets the this node knows about, self owned + remote
      */
     /**
      * Returns all the buckets the this node knows about, self owned + remote
      */
@@ -224,6 +228,11 @@ public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMe
         if(log.isDebugEnabled()) {
             log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
         }
         if(log.isDebugEnabled()) {
             log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
         }
+
+        onBucketsUpdated();
+    }
+
+    protected void onBucketsUpdated() {
     }
 
     public BucketImpl<T> getLocalBucket() {
     }
 
     public BucketImpl<T> getLocalBucket() {
index fd1e6f1db4390a354aae9a69f18377513d85573d..2c2fd46a2914ca8a41aaae38d1ae9a3165d77685 100644 (file)
@@ -8,12 +8,20 @@
 
 package org.opendaylight.controller.remote.rpc.registry;
 
 
 package org.opendaylight.controller.remote.rpc.registry;
 
+import static org.junit.Assert.fail;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.Props;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.Props;
+import akka.cluster.Cluster;
+import akka.cluster.ClusterEvent.CurrentClusterState;
+import akka.cluster.Member;
+import akka.cluster.MemberStatus;
+import akka.cluster.UniqueAddress;
 import akka.japi.Pair;
 import akka.testkit.JavaTestKit;
 import akka.japi.Pair;
 import akka.testkit.JavaTestKit;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
@@ -24,6 +32,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.AfterClass;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -71,12 +80,36 @@ public class RpcRegistryTest {
             }
         };
 
             }
         };
 
-        RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").withConfigReader(reader).build();
-        RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").withConfigReader(reader).build();
-        RemoteRpcProviderConfig config3 = new RemoteRpcProviderConfig.Builder("memberC").withConfigReader(reader).build();
+        RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").gossipTickInterval("200ms").
+                withConfigReader(reader).build();
+        RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").gossipTickInterval("200ms")
+                .withConfigReader(reader).build();
+        RemoteRpcProviderConfig config3 = new RemoteRpcProviderConfig.Builder("memberC").gossipTickInterval("200ms")
+                .withConfigReader(reader).build();
         node1 = ActorSystem.create("opendaylight-rpc", config1.get());
         node2 = ActorSystem.create("opendaylight-rpc", config2.get());
         node3 = ActorSystem.create("opendaylight-rpc", config3.get());
         node1 = ActorSystem.create("opendaylight-rpc", config1.get());
         node2 = ActorSystem.create("opendaylight-rpc", config2.get());
         node3 = ActorSystem.create("opendaylight-rpc", config3.get());
+
+        waitForMembersUp(node1, Cluster.get(node2).selfUniqueAddress(), Cluster.get(node3).selfUniqueAddress());
+        waitForMembersUp(node2, Cluster.get(node1).selfUniqueAddress(), Cluster.get(node3).selfUniqueAddress());
+    }
+
+    static void waitForMembersUp(ActorSystem node, UniqueAddress... addresses) {
+        Set<UniqueAddress> otherMembersSet = Sets.newHashSet(addresses);
+        Stopwatch sw = Stopwatch.createStarted();
+        while(sw.elapsed(TimeUnit.SECONDS) <= 10) {
+            CurrentClusterState state = Cluster.get(node).state();
+            for(Member m: state.getMembers()) {
+                if(m.status() == MemberStatus.up() && otherMembersSet.remove(m.uniqueAddress()) &&
+                        otherMembersSet.isEmpty()) {
+                    return;
+                }
+            }
+
+            Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+        }
+
+        fail("Member(s) " + otherMembersSet + " are not Up");
     }
 
     @AfterClass
     }
 
     @AfterClass
@@ -366,4 +399,41 @@ public class RpcRegistryTest {
         return routeIds;
     }
 
         return routeIds;
     }
 
+    @Test
+    public void testFindRoutersNotPresentInitially() throws Exception {
+
+        final JavaTestKit mockBroker1 = new JavaTestKit(node1);
+        final JavaTestKit mockBroker2 = new JavaTestKit(node2);
+
+        registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
+        registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef());
+
+        List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = createRouteIds();
+
+        registry1.tell(new FindRouters(routeIds.get(0)), mockBroker1.getRef());
+
+        registry2.tell(new AddOrUpdateRoutes(routeIds), mockBroker2.getRef());
+
+        FindRoutersReply reply = mockBroker1.expectMsgClass(Duration.create(7, TimeUnit.SECONDS),
+                FindRoutersReply.class);
+        List<Pair<ActorRef, Long>> respList = reply.getRouterWithUpdateTime();
+        Assert.assertEquals("getRouterWithUpdateTime size", 1, respList.size());
+    }
+
+    @Test
+    public void testFindRoutersNonExistent() throws Exception {
+
+        final JavaTestKit mockBroker1 = new JavaTestKit(node1);
+
+        registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
+
+        List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = createRouteIds();
+
+        registry1.tell(new FindRouters(routeIds.get(0)), mockBroker1.getRef());
+
+        FindRoutersReply reply = mockBroker1.expectMsgClass(Duration.create(7, TimeUnit.SECONDS),
+                FindRoutersReply.class);
+        List<Pair<ActorRef, Long>> respList = reply.getRouterWithUpdateTime();
+        Assert.assertEquals("getRouterWithUpdateTime size", 0, respList.size());
+    }
 }
 }
index 8e310815faed92617d626d14a5433bd61664a00b..f22b4eafb390fac9774afbe301bd5d375f1ec9c9 100644 (file)
@@ -56,7 +56,7 @@ memberA{
   }
   akka {
     loglevel = "INFO"
   }
   akka {
     loglevel = "INFO"
-    #loggers = ["akka.event.slf4j.Slf4jLogger"]
+    loggers = ["akka.event.slf4j.Slf4jLogger"]
     actor {
       provider = "akka.cluster.ClusterActorRefProvider"
       debug {
     actor {
       provider = "akka.cluster.ClusterActorRefProvider"
       debug {
@@ -89,7 +89,7 @@ memberB{
   }
   akka {
     loglevel = "INFO"
   }
   akka {
     loglevel = "INFO"
-    #loggers = ["akka.event.slf4j.Slf4jLogger"]
+    loggers = ["akka.event.slf4j.Slf4jLogger"]
 
     actor {
       provider = "akka.cluster.ClusterActorRefProvider"
 
     actor {
       provider = "akka.cluster.ClusterActorRefProvider"
@@ -123,7 +123,7 @@ memberC{
   }
   akka {
     loglevel = "INFO"
   }
   akka {
     loglevel = "INFO"
-    #loggers = ["akka.event.slf4j.Slf4jLogger"]
+    loggers = ["akka.event.slf4j.Slf4jLogger"]
     actor {
       provider = "akka.cluster.ClusterActorRefProvider"
       debug {
     actor {
       provider = "akka.cluster.ClusterActorRefProvider"
       debug {
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/simplelogger.properties b/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/simplelogger.properties
new file mode 100644 (file)
index 0000000..d51b344
--- /dev/null
@@ -0,0 +1,6 @@
+org.slf4j.simpleLogger.showDateTime=true
+org.slf4j.simpleLogger.dateTimeFormat=hh:mm:ss,S a
+org.slf4j.simpleLogger.logFile=System.out
+org.slf4j.simpleLogger.showShortLogName=true
+org.slf4j.simpleLogger.levelInBrackets=true
+org.slf4j.simpleLogger.log.org.opendaylight.controller.remote.rpc=debug