}
+ public Builder gossipTickInterval(String interval) {
+ configHolder.put(TAG_GOSSIP_TICK_INTERVAL, interval);
+ return this;
+ }
+
+ @Override
public RemoteRpcProviderConfig build(){
return new RemoteRpcProviderConfig(merge());
}
package org.opendaylight.controller.remote.rpc.registry;
import akka.actor.ActorRef;
+import akka.actor.Cancellable;
import akka.actor.Props;
import akka.japi.Creator;
import akka.japi.Option;
import akka.japi.Pair;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
+import scala.concurrent.duration.FiniteDuration;
/**
* Registry to look up cluster nodes that have registered for a given rpc.
* cluster wide information.
*/
public class RpcRegistry extends BucketStore<RoutingTable> {
+ private final Set<Runnable> routesUpdatedCallbacks = new HashSet<>();
+ private final FiniteDuration findRouterTimeout;
public RpcRegistry(RemoteRpcProviderConfig config) {
super(config);
getLocalBucket().setData(new RoutingTable());
+ findRouterTimeout = getConfig().getGossipTickInterval().$times(10);
}
public static Props props(RemoteRpcProviderConfig config) {
receiveRemoveRoutes((RemoveRoutes) message);
} else if (message instanceof Messages.FindRouters) {
receiveGetRouter((FindRouters) message);
+ } else if (message instanceof Runnable) {
+ ((Runnable)message).run();
} else {
super.handleReceive(message);
}
}
updateLocalBucket(table);
+
+ onBucketsUpdated();
}
/**
/**
* Finds routers for the given rpc.
*
- * @param msg
+ * @param findRouters
*/
- private void receiveGetRouter(FindRouters msg) {
+ private void receiveGetRouter(final FindRouters findRouters) {
+ log.debug("receiveGetRouter for {}", findRouters.getRouteIdentifier());
+
+ final ActorRef sender = getSender();
+ if(!findRouters(findRouters, sender)) {
+ log.debug("No routers found for {} - scheduling {} ms timer", findRouters.getRouteIdentifier(),
+ findRouterTimeout.toMillis());
+
+ final AtomicReference<Cancellable> timer = new AtomicReference<>();
+ final Runnable routesUpdatedRunnable = new Runnable() {
+ @Override
+ public void run() {
+ if(findRouters(findRouters, sender)) {
+ routesUpdatedCallbacks.remove(this);
+ timer.get().cancel();
+ }
+ }
+ };
+
+ routesUpdatedCallbacks.add(routesUpdatedRunnable);
+
+ Runnable timerRunnable = new Runnable() {
+ @Override
+ public void run() {
+ log.warn("Timed out finding routers for {}", findRouters.getRouteIdentifier());
+
+ routesUpdatedCallbacks.remove(routesUpdatedRunnable);
+ sender.tell(new Messages.FindRoutersReply(
+ Collections.<Pair<ActorRef, Long>>emptyList()), self());
+ }
+ };
+
+ timer.set(getContext().system().scheduler().scheduleOnce(findRouterTimeout, self(), timerRunnable,
+ getContext().dispatcher(), self()));
+ }
+ }
+
+ private boolean findRouters(FindRouters findRouters, ActorRef sender) {
List<Pair<ActorRef, Long>> routers = new ArrayList<>();
- RouteIdentifier<?, ?, ?> routeId = msg.getRouteIdentifier();
+ RouteIdentifier<?, ?, ?> routeId = findRouters.getRouteIdentifier();
findRoutes(getLocalBucket().getData(), routeId, routers);
for(Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
findRoutes(bucket.getData(), routeId, routers);
}
- getSender().tell(new Messages.FindRoutersReply(routers), getSelf());
+ log.debug("Found {} routers for {}", routers.size(), findRouters.getRouteIdentifier());
+
+ boolean foundRouters = !routers.isEmpty();
+ if(foundRouters) {
+ sender.tell(new Messages.FindRoutersReply(routers), getSelf());
+ }
+
+ return foundRouters;
}
private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier<?, ?, ?> routeId,
}
}
+ @Override
+ protected void onBucketsUpdated() {
+ for(Runnable callBack: routesUpdatedCallbacks) {
+ callBack.run();
+ }
+ }
+
/**
* All messages used by the RpcRegistry
*/
}
}
+ protected RemoteRpcProviderConfig getConfig() {
+ return config;
+ }
+
/**
* Returns all the buckets the this node knows about, self owned + remote
*/
if(log.isDebugEnabled()) {
log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
}
+
+ onBucketsUpdated();
+ }
+
+ protected void onBucketsUpdated() {
}
public BucketImpl<T> getLocalBucket() {
package org.opendaylight.controller.remote.rpc.registry;
+import static org.junit.Assert.fail;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.Props;
+import akka.cluster.Cluster;
+import akka.cluster.ClusterEvent.CurrentClusterState;
+import akka.cluster.Member;
+import akka.cluster.MemberStatus;
+import akka.cluster.UniqueAddress;
import akka.japi.Pair;
import akka.testkit.JavaTestKit;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.AfterClass;
}
};
- RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").withConfigReader(reader).build();
- RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").withConfigReader(reader).build();
- RemoteRpcProviderConfig config3 = new RemoteRpcProviderConfig.Builder("memberC").withConfigReader(reader).build();
+ RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").gossipTickInterval("200ms").
+ withConfigReader(reader).build();
+ RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").gossipTickInterval("200ms")
+ .withConfigReader(reader).build();
+ RemoteRpcProviderConfig config3 = new RemoteRpcProviderConfig.Builder("memberC").gossipTickInterval("200ms")
+ .withConfigReader(reader).build();
node1 = ActorSystem.create("opendaylight-rpc", config1.get());
node2 = ActorSystem.create("opendaylight-rpc", config2.get());
node3 = ActorSystem.create("opendaylight-rpc", config3.get());
+
+ waitForMembersUp(node1, Cluster.get(node2).selfUniqueAddress(), Cluster.get(node3).selfUniqueAddress());
+ waitForMembersUp(node2, Cluster.get(node1).selfUniqueAddress(), Cluster.get(node3).selfUniqueAddress());
+ }
+
+ static void waitForMembersUp(ActorSystem node, UniqueAddress... addresses) {
+ Set<UniqueAddress> otherMembersSet = Sets.newHashSet(addresses);
+ Stopwatch sw = Stopwatch.createStarted();
+ while(sw.elapsed(TimeUnit.SECONDS) <= 10) {
+ CurrentClusterState state = Cluster.get(node).state();
+ for(Member m: state.getMembers()) {
+ if(m.status() == MemberStatus.up() && otherMembersSet.remove(m.uniqueAddress()) &&
+ otherMembersSet.isEmpty()) {
+ return;
+ }
+ }
+
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ }
+
+ fail("Member(s) " + otherMembersSet + " are not Up");
}
@AfterClass
return routeIds;
}
+ @Test
+ public void testFindRoutersNotPresentInitially() throws Exception {
+
+ final JavaTestKit mockBroker1 = new JavaTestKit(node1);
+ final JavaTestKit mockBroker2 = new JavaTestKit(node2);
+
+ registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
+ registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef());
+
+ List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = createRouteIds();
+
+ registry1.tell(new FindRouters(routeIds.get(0)), mockBroker1.getRef());
+
+ registry2.tell(new AddOrUpdateRoutes(routeIds), mockBroker2.getRef());
+
+ FindRoutersReply reply = mockBroker1.expectMsgClass(Duration.create(7, TimeUnit.SECONDS),
+ FindRoutersReply.class);
+ List<Pair<ActorRef, Long>> respList = reply.getRouterWithUpdateTime();
+ Assert.assertEquals("getRouterWithUpdateTime size", 1, respList.size());
+ }
+
+ @Test
+ public void testFindRoutersNonExistent() throws Exception {
+
+ final JavaTestKit mockBroker1 = new JavaTestKit(node1);
+
+ registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
+
+ List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = createRouteIds();
+
+ registry1.tell(new FindRouters(routeIds.get(0)), mockBroker1.getRef());
+
+ FindRoutersReply reply = mockBroker1.expectMsgClass(Duration.create(7, TimeUnit.SECONDS),
+ FindRoutersReply.class);
+ List<Pair<ActorRef, Long>> respList = reply.getRouterWithUpdateTime();
+ Assert.assertEquals("getRouterWithUpdateTime size", 0, respList.size());
+ }
}
}
akka {
loglevel = "INFO"
- #loggers = ["akka.event.slf4j.Slf4jLogger"]
+ loggers = ["akka.event.slf4j.Slf4jLogger"]
actor {
provider = "akka.cluster.ClusterActorRefProvider"
debug {
}
akka {
loglevel = "INFO"
- #loggers = ["akka.event.slf4j.Slf4jLogger"]
+ loggers = ["akka.event.slf4j.Slf4jLogger"]
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
akka {
loglevel = "INFO"
- #loggers = ["akka.event.slf4j.Slf4jLogger"]
+ loggers = ["akka.event.slf4j.Slf4jLogger"]
actor {
provider = "akka.cluster.ClusterActorRefProvider"
debug {
--- /dev/null
+org.slf4j.simpleLogger.showDateTime=true
+org.slf4j.simpleLogger.dateTimeFormat=hh:mm:ss,S a
+org.slf4j.simpleLogger.logFile=System.out
+org.slf4j.simpleLogger.showShortLogName=true
+org.slf4j.simpleLogger.levelInBrackets=true
+org.slf4j.simpleLogger.log.org.opendaylight.controller.remote.rpc=debug