X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2FRpcRegistry.java;h=fc5b618663988d304d6b51468d2da9f99250d7b2;hp=845c1c819a70ca6bac0fb1a717b31f7861b6a6b6;hb=b78ee4d6b08e2cc0cf5edd01af0e54c3bf619ab5;hpb=1e80b656857bf829d8ae3cae21b0b726190b96ea
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java
index 845c1c819a..fc5b618663 100644
--- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java
+++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java
@@ -8,13 +8,18 @@
package org.opendaylight.controller.remote.rpc.registry;
import akka.actor.ActorRef;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
+import akka.actor.Cancellable;
+import akka.actor.Props;
import akka.japi.Option;
import akka.japi.Pair;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
@@ -23,19 +28,27 @@ import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
+import scala.concurrent.duration.FiniteDuration;
/**
* Registry to look up cluster nodes that have registered for a given rpc.
- *
+ *
+ *
* It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
* cluster wide information.
*/
public class RpcRegistry extends BucketStore {
+ private final Set routesUpdatedCallbacks = new HashSet<>();
+ private final FiniteDuration findRouterTimeout;
- final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
-
- public RpcRegistry() {
+ public RpcRegistry(RemoteRpcProviderConfig config) {
+ super(config);
getLocalBucket().setData(new RoutingTable());
+ findRouterTimeout = getConfig().getGossipTickInterval().$times(10);
+ }
+
+ public static Props props(RemoteRpcProviderConfig config) {
+ return Props.create(RpcRegistry.class, config);
}
@Override
@@ -50,13 +63,15 @@ public class RpcRegistry extends BucketStore {
receiveRemoveRoutes((RemoveRoutes) message);
} else if (message instanceof Messages.FindRouters) {
receiveGetRouter((FindRouters) message);
+ } else if (message instanceof Runnable) {
+ ((Runnable)message).run();
} else {
super.handleReceive(message);
}
}
/**
- * Register's rpc broker
+ * Registers a rpc broker.
*
* @param message contains {@link akka.actor.ActorRef} for rpc broker
*/
@@ -64,22 +79,23 @@ public class RpcRegistry extends BucketStore {
getLocalBucket().getData().setRouter(message.getRouter());
}
- /**
- * @param msg
- */
private void receiveAddRoutes(AddOrUpdateRoutes msg) {
log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
RoutingTable table = getLocalBucket().getData().copy();
- for(RpcRouter.RouteIdentifier, ?, ?> routeId : msg.getRouteIdentifiers()) {
+ for (RpcRouter.RouteIdentifier, ?, ?> routeId : msg.getRouteIdentifiers()) {
table.addRoute(routeId);
}
updateLocalBucket(table);
+
+ onBucketsUpdated();
}
/**
+ * Processes a RemoveRoutes message.
+ *
* @param msg contains list of route ids to remove
*/
private void receiveRemoveRoutes(RemoveRoutes msg) {
@@ -95,19 +111,60 @@ public class RpcRegistry extends BucketStore {
/**
* Finds routers for the given rpc.
*
- * @param msg
+ * @param findRouters the FindRouters request
*/
- private void receiveGetRouter(FindRouters msg) {
+ private void receiveGetRouter(final FindRouters findRouters) {
+ log.debug("receiveGetRouter for {}", findRouters.getRouteIdentifier());
+
+ final ActorRef sender = getSender();
+ if (!findRouters(findRouters, sender)) {
+ log.debug("No routers found for {} - scheduling {} ms timer", findRouters.getRouteIdentifier(),
+ findRouterTimeout.toMillis());
+
+ final AtomicReference timer = new AtomicReference<>();
+ final Runnable routesUpdatedRunnable = new Runnable() {
+ @Override
+ public void run() {
+ if (findRouters(findRouters, sender)) {
+ routesUpdatedCallbacks.remove(this);
+ timer.get().cancel();
+ }
+ }
+ };
+
+ routesUpdatedCallbacks.add(routesUpdatedRunnable);
+
+ Runnable timerRunnable = () -> {
+ log.warn("Timed out finding routers for {}", findRouters.getRouteIdentifier());
+
+ routesUpdatedCallbacks.remove(routesUpdatedRunnable);
+ sender.tell(new Messages.FindRoutersReply(
+ Collections.>emptyList()), self());
+ };
+
+ timer.set(getContext().system().scheduler().scheduleOnce(findRouterTimeout, self(), timerRunnable,
+ getContext().dispatcher(), self()));
+ }
+ }
+
+ private boolean findRouters(FindRouters findRouters, ActorRef sender) {
List> routers = new ArrayList<>();
- RouteIdentifier, ?, ?> routeId = msg.getRouteIdentifier();
+ RouteIdentifier, ?, ?> routeId = findRouters.getRouteIdentifier();
findRoutes(getLocalBucket().getData(), routeId, routers);
- for(Bucket bucket : getRemoteBuckets().values()) {
+ for (Bucket bucket : getRemoteBuckets().values()) {
findRoutes(bucket.getData(), routeId, routers);
}
- getSender().tell(new Messages.FindRoutersReply(routers), getSelf());
+ log.debug("Found {} routers for {}", routers.size(), findRouters.getRouteIdentifier());
+
+ boolean foundRouters = !routers.isEmpty();
+ if (foundRouters) {
+ sender.tell(new Messages.FindRoutersReply(routers), getSelf());
+ }
+
+ return foundRouters;
}
private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier, ?, ?> routeId,
@@ -117,13 +174,24 @@ public class RpcRegistry extends BucketStore {
}
Option> routerWithUpdateTime = table.getRouterFor(routeId);
- if(!routerWithUpdateTime.isEmpty()) {
+ if (!routerWithUpdateTime.isEmpty()) {
routers.add(routerWithUpdateTime.get());
}
}
+ @Override
+ protected void onBucketsUpdated() {
+ if (routesUpdatedCallbacks.isEmpty()) {
+ return;
+ }
+
+ for (Runnable callBack: routesUpdatedCallbacks.toArray(new Runnable[routesUpdatedCallbacks.size()])) {
+ callBack.run();
+ }
+ }
+
/**
- * All messages used by the RpcRegistry
+ * All messages used by the RpcRegistry.
*/
public static class Messages {
@@ -132,9 +200,8 @@ public class RpcRegistry extends BucketStore {
final List> routeIdentifiers;
public ContainsRoute(List> routeIdentifiers) {
- Preconditions.checkArgument(routeIdentifiers != null &&
- !routeIdentifiers.isEmpty(),
- "Route Identifiers must be supplied");
+ Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(),
+ "Route Identifiers must be supplied");
this.routeIdentifiers = routeIdentifiers;
}
@@ -144,9 +211,7 @@ public class RpcRegistry extends BucketStore {
@Override
public String toString() {
- return "ContainsRoute{" +
- "routeIdentifiers=" + routeIdentifiers +
- '}';
+ return "ContainsRoute{" + "routeIdentifiers=" + routeIdentifiers + '}';
}
}
@@ -178,9 +243,7 @@ public class RpcRegistry extends BucketStore {
@Override
public String toString() {
- return "SetLocalRouter{" +
- "router=" + router +
- '}';
+ return "SetLocalRouter{" + "router=" + router + '}';
}
}
@@ -198,9 +261,7 @@ public class RpcRegistry extends BucketStore {
@Override
public String toString() {
- return "FindRouters{" +
- "routeIdentifier=" + routeIdentifier +
- '}';
+ return "FindRouters{" + "routeIdentifier=" + routeIdentifier + '}';
}
}
@@ -218,9 +279,7 @@ public class RpcRegistry extends BucketStore {
@Override
public String toString() {
- return "FindRoutersReply{" +
- "routerWithUpdateTime=" + routerWithUpdateTime +
- '}';
+ return "FindRoutersReply{" + "routerWithUpdateTime=" + routerWithUpdateTime + '}';
}
}
}