From 92ce52ab3df561a2a07bf56c7115123b0825449e Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Fri, 5 Feb 2016 02:42:54 -0500 Subject: [PATCH] Bug 4866: Add wait/retries for routed RPCs If a routed RPC is registered on one node it takes a little time for the route to propagate via gossip to other nodes. If another node tries to invoke the RPC prior to propagation it fails. To alleviate this timing issue, I added wait/retries via a timer in the RpcRegistry for the FindRouters message. As routes are updated via gossip, it retries the FindRouters request. If the timer triggers, it sends back an empty list. The timer period is 10 times the gossip tick interval (500ms * 10 = 5s). Change-Id: Iaafcfb4c93cde44f62f6645c8b8684102ac0d0db Signed-off-by: Tom Pantelis --- .../remote/rpc/RemoteRpcProviderConfig.java | 6 ++ .../remote/rpc/registry/RpcRegistry.java | 72 +++++++++++++++++- .../rpc/registry/gossip/BucketStore.java | 9 +++ .../remote/rpc/registry/RpcRegistryTest.java | 76 ++++++++++++++++++- .../src/test/resources/application.conf | 6 +- .../test/resources/simplelogger.properties | 6 ++ 6 files changed, 165 insertions(+), 10 deletions(-) create mode 100644 opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/simplelogger.properties diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfig.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfig.java index 6f3a10e648..995b1d5172 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfig.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfig.java @@ -103,6 +103,12 @@ public class RemoteRpcProviderConfig extends CommonConfig { } + public Builder gossipTickInterval(String interval) { + configHolder.put(TAG_GOSSIP_TICK_INTERVAL, interval); + return this; + } + + @Override public RemoteRpcProviderConfig build(){ return new RemoteRpcProviderConfig(merge()); } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java index c2ff0456f7..b467ce949a 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java @@ -8,13 +8,18 @@ package org.opendaylight.controller.remote.rpc.registry; import akka.actor.ActorRef; +import akka.actor.Cancellable; import akka.actor.Props; import akka.japi.Creator; import akka.japi.Option; import akka.japi.Pair; import com.google.common.base.Preconditions; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters; @@ -26,6 +31,7 @@ import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryM import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl; import org.opendaylight.controller.sal.connector.api.RpcRouter; import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier; +import scala.concurrent.duration.FiniteDuration; /** * Registry to look up cluster nodes that have registered for a given rpc. @@ -34,10 +40,13 @@ import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier; * cluster wide information. */ public class RpcRegistry extends BucketStore { + private final Set routesUpdatedCallbacks = new HashSet<>(); + private final FiniteDuration findRouterTimeout; public RpcRegistry(RemoteRpcProviderConfig config) { super(config); getLocalBucket().setData(new RoutingTable()); + findRouterTimeout = getConfig().getGossipTickInterval().$times(10); } public static Props props(RemoteRpcProviderConfig config) { @@ -56,6 +65,8 @@ public class RpcRegistry extends BucketStore { receiveRemoveRoutes((RemoveRoutes) message); } else if (message instanceof Messages.FindRouters) { receiveGetRouter((FindRouters) message); + } else if (message instanceof Runnable) { + ((Runnable)message).run(); } else { super.handleReceive(message); } @@ -83,6 +94,8 @@ public class RpcRegistry extends BucketStore { } updateLocalBucket(table); + + onBucketsUpdated(); } /** @@ -101,19 +114,63 @@ public class RpcRegistry extends BucketStore { /** * Finds routers for the given rpc. * - * @param msg + * @param findRouters */ - private void receiveGetRouter(FindRouters msg) { + private void receiveGetRouter(final FindRouters findRouters) { + log.debug("receiveGetRouter for {}", findRouters.getRouteIdentifier()); + + final ActorRef sender = getSender(); + if(!findRouters(findRouters, sender)) { + log.debug("No routers found for {} - scheduling {} ms timer", findRouters.getRouteIdentifier(), + findRouterTimeout.toMillis()); + + final AtomicReference timer = new AtomicReference<>(); + final Runnable routesUpdatedRunnable = new Runnable() { + @Override + public void run() { + if(findRouters(findRouters, sender)) { + routesUpdatedCallbacks.remove(this); + timer.get().cancel(); + } + } + }; + + routesUpdatedCallbacks.add(routesUpdatedRunnable); + + Runnable timerRunnable = new Runnable() { + @Override + public void run() { + log.warn("Timed out finding routers for {}", findRouters.getRouteIdentifier()); + + routesUpdatedCallbacks.remove(routesUpdatedRunnable); + sender.tell(new Messages.FindRoutersReply( + Collections.>emptyList()), self()); + } + }; + + timer.set(getContext().system().scheduler().scheduleOnce(findRouterTimeout, self(), timerRunnable, + getContext().dispatcher(), self())); + } + } + + private boolean findRouters(FindRouters findRouters, ActorRef sender) { List> routers = new ArrayList<>(); - RouteIdentifier routeId = msg.getRouteIdentifier(); + RouteIdentifier routeId = findRouters.getRouteIdentifier(); findRoutes(getLocalBucket().getData(), routeId, routers); for(Bucket bucket : getRemoteBuckets().values()) { findRoutes(bucket.getData(), routeId, routers); } - getSender().tell(new Messages.FindRoutersReply(routers), getSelf()); + log.debug("Found {} routers for {}", routers.size(), findRouters.getRouteIdentifier()); + + boolean foundRouters = !routers.isEmpty(); + if(foundRouters) { + sender.tell(new Messages.FindRoutersReply(routers), getSelf()); + } + + return foundRouters; } private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier routeId, @@ -128,6 +185,13 @@ public class RpcRegistry extends BucketStore { } } + @Override + protected void onBucketsUpdated() { + for(Runnable callBack: routesUpdatedCallbacks) { + callBack.run(); + } + } + /** * All messages used by the RpcRegistry */ diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java index 39235bd978..cc24e6845f 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java @@ -111,6 +111,10 @@ public class BucketStore> extends AbstractUntypedActorWithMe } } + protected RemoteRpcProviderConfig getConfig() { + return config; + } + /** * Returns all the buckets the this node knows about, self owned + remote */ @@ -224,6 +228,11 @@ public class BucketStore> extends AbstractUntypedActorWithMe if(log.isDebugEnabled()) { log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets); } + + onBucketsUpdated(); + } + + protected void onBucketsUpdated() { } public BucketImpl getLocalBucket() { diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java index fd1e6f1db4..2c2fd46a29 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java @@ -8,12 +8,20 @@ package org.opendaylight.controller.remote.rpc.registry; +import static org.junit.Assert.fail; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.Props; +import akka.cluster.Cluster; +import akka.cluster.ClusterEvent.CurrentClusterState; +import akka.cluster.Member; +import akka.cluster.MemberStatus; +import akka.cluster.UniqueAddress; import akka.japi.Pair; import akka.testkit.JavaTestKit; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; @@ -24,6 +32,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.AfterClass; @@ -71,12 +80,36 @@ public class RpcRegistryTest { } }; - RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").withConfigReader(reader).build(); - RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").withConfigReader(reader).build(); - RemoteRpcProviderConfig config3 = new RemoteRpcProviderConfig.Builder("memberC").withConfigReader(reader).build(); + RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").gossipTickInterval("200ms"). + withConfigReader(reader).build(); + RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").gossipTickInterval("200ms") + .withConfigReader(reader).build(); + RemoteRpcProviderConfig config3 = new RemoteRpcProviderConfig.Builder("memberC").gossipTickInterval("200ms") + .withConfigReader(reader).build(); node1 = ActorSystem.create("opendaylight-rpc", config1.get()); node2 = ActorSystem.create("opendaylight-rpc", config2.get()); node3 = ActorSystem.create("opendaylight-rpc", config3.get()); + + waitForMembersUp(node1, Cluster.get(node2).selfUniqueAddress(), Cluster.get(node3).selfUniqueAddress()); + waitForMembersUp(node2, Cluster.get(node1).selfUniqueAddress(), Cluster.get(node3).selfUniqueAddress()); + } + + static void waitForMembersUp(ActorSystem node, UniqueAddress... addresses) { + Set otherMembersSet = Sets.newHashSet(addresses); + Stopwatch sw = Stopwatch.createStarted(); + while(sw.elapsed(TimeUnit.SECONDS) <= 10) { + CurrentClusterState state = Cluster.get(node).state(); + for(Member m: state.getMembers()) { + if(m.status() == MemberStatus.up() && otherMembersSet.remove(m.uniqueAddress()) && + otherMembersSet.isEmpty()) { + return; + } + } + + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + } + + fail("Member(s) " + otherMembersSet + " are not Up"); } @AfterClass @@ -366,4 +399,41 @@ public class RpcRegistryTest { return routeIds; } + @Test + public void testFindRoutersNotPresentInitially() throws Exception { + + final JavaTestKit mockBroker1 = new JavaTestKit(node1); + final JavaTestKit mockBroker2 = new JavaTestKit(node2); + + registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef()); + registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef()); + + List> routeIds = createRouteIds(); + + registry1.tell(new FindRouters(routeIds.get(0)), mockBroker1.getRef()); + + registry2.tell(new AddOrUpdateRoutes(routeIds), mockBroker2.getRef()); + + FindRoutersReply reply = mockBroker1.expectMsgClass(Duration.create(7, TimeUnit.SECONDS), + FindRoutersReply.class); + List> respList = reply.getRouterWithUpdateTime(); + Assert.assertEquals("getRouterWithUpdateTime size", 1, respList.size()); + } + + @Test + public void testFindRoutersNonExistent() throws Exception { + + final JavaTestKit mockBroker1 = new JavaTestKit(node1); + + registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef()); + + List> routeIds = createRouteIds(); + + registry1.tell(new FindRouters(routeIds.get(0)), mockBroker1.getRef()); + + FindRoutersReply reply = mockBroker1.expectMsgClass(Duration.create(7, TimeUnit.SECONDS), + FindRoutersReply.class); + List> respList = reply.getRouterWithUpdateTime(); + Assert.assertEquals("getRouterWithUpdateTime size", 0, respList.size()); + } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/application.conf b/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/application.conf index 8e310815fa..f22b4eafb3 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/application.conf +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/application.conf @@ -56,7 +56,7 @@ memberA{ } akka { loglevel = "INFO" - #loggers = ["akka.event.slf4j.Slf4jLogger"] + loggers = ["akka.event.slf4j.Slf4jLogger"] actor { provider = "akka.cluster.ClusterActorRefProvider" debug { @@ -89,7 +89,7 @@ memberB{ } akka { loglevel = "INFO" - #loggers = ["akka.event.slf4j.Slf4jLogger"] + loggers = ["akka.event.slf4j.Slf4jLogger"] actor { provider = "akka.cluster.ClusterActorRefProvider" @@ -123,7 +123,7 @@ memberC{ } akka { loglevel = "INFO" - #loggers = ["akka.event.slf4j.Slf4jLogger"] + loggers = ["akka.event.slf4j.Slf4jLogger"] actor { provider = "akka.cluster.ClusterActorRefProvider" debug { diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/simplelogger.properties b/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/simplelogger.properties new file mode 100644 index 0000000000..d51b344686 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/simplelogger.properties @@ -0,0 +1,6 @@ +org.slf4j.simpleLogger.showDateTime=true +org.slf4j.simpleLogger.dateTimeFormat=hh:mm:ss,S a +org.slf4j.simpleLogger.logFile=System.out +org.slf4j.simpleLogger.showShortLogName=true +org.slf4j.simpleLogger.levelInBrackets=true +org.slf4j.simpleLogger.log.org.opendaylight.controller.remote.rpc=debug -- 2.36.6