X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2FRpcRegistry.java;h=1545eb00d2cc9af4bda80881d197bad132f0e667;hb=8426e7a67b1235e8ecc67b1a98a5bd096c88e729;hp=7c5efc29e8640c6028ca829fd022b12aea010400;hpb=9ddc65e1ddae50f691566cd9382707679436c055;p=controller.git
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java
index 7c5efc29e8..1545eb00d2 100644
--- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java
+++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java
@@ -8,81 +8,72 @@
package org.opendaylight.controller.remote.rpc.registry;
import akka.actor.ActorRef;
-import akka.actor.Cancellable;
+import akka.actor.Address;
import akka.actor.Props;
-import akka.japi.Creator;
-import akka.japi.Option;
-import akka.japi.Pair;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
-import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
-import scala.concurrent.duration.FiniteDuration;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
/**
- * Registry to look up cluster nodes that have registered for a given rpc.
- *
+ * Registry to look up cluster nodes that have registered for a given RPC.
+ *
+ *
* It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
* cluster wide information.
*/
public class RpcRegistry extends BucketStore {
- private final Set routesUpdatedCallbacks = new HashSet<>();
- private final FiniteDuration findRouterTimeout;
+ private final ActorRef rpcRegistrar;
- public RpcRegistry(RemoteRpcProviderConfig config) {
- super(config);
- getLocalBucket().setData(new RoutingTable());
- findRouterTimeout = getConfig().getGossipTickInterval().$times(10);
+ public RpcRegistry(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
+ super(config, new RoutingTable(rpcInvoker));
+ this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar);
}
- public static Props props(RemoteRpcProviderConfig config) {
- return Props.create(new RpcRegistryCreator(config));
+ /**
+ * Create a new props instance for instantiating an RpcRegistry actor.
+ *
+ * @param config Provider configuration
+ * @param rpcRegistrar Local RPC provider interface, used to register routers to remote nodes
+ * @param rpcInvoker Actor handling RPC invocation requests from remote nodes
+ * @return A new {@link Props} instance
+ */
+ public static Props props(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker,
+ final ActorRef rpcRegistrar) {
+ return Props.create(RpcRegistry.class, config, rpcInvoker, rpcRegistrar);
}
@Override
- protected void handleReceive(Object message) throws Exception {
- //TODO: if sender is remote, reject message
-
- if (message instanceof SetLocalRouter) {
- receiveSetLocalRouter((SetLocalRouter) message);
- } else if (message instanceof AddOrUpdateRoutes) {
+ protected void handleReceive(final Object message) throws Exception {
+ if (message instanceof AddOrUpdateRoutes) {
receiveAddRoutes((AddOrUpdateRoutes) message);
} else if (message instanceof RemoveRoutes) {
receiveRemoveRoutes((RemoveRoutes) message);
- } else if (message instanceof Messages.FindRouters) {
- receiveGetRouter((FindRouters) message);
- } else if (message instanceof Runnable) {
- ((Runnable)message).run();
} else {
super.handleReceive(message);
}
}
- /**
- * Registers a rpc broker.
- *
- * @param message contains {@link akka.actor.ActorRef} for rpc broker
- */
- private void receiveSetLocalRouter(SetLocalRouter message) {
- getLocalBucket().getData().setRouter(message.getRouter());
- }
-
- private void receiveAddRoutes(AddOrUpdateRoutes msg) {
-
- log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
+ private void receiveAddRoutes(final AddOrUpdateRoutes msg) {
+ LOG.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
RoutingTable table = getLocalBucket().getData().copy();
for (RpcRouter.RouteIdentifier, ?, ?> routeId : msg.getRouteIdentifiers()) {
@@ -90,8 +81,6 @@ public class RpcRegistry extends BucketStore {
}
updateLocalBucket(table);
-
- onBucketsUpdated();
}
/**
@@ -99,8 +88,7 @@ public class RpcRegistry extends BucketStore {
*
* @param msg contains list of route ids to remove
*/
- private void receiveRemoveRoutes(RemoveRoutes msg) {
-
+ private void receiveRemoveRoutes(final RemoveRoutes msg) {
RoutingTable table = getLocalBucket().getData().copy();
for (RpcRouter.RouteIdentifier, ?, ?> routeId : msg.getRouteIdentifiers()) {
table.removeRoute(routeId);
@@ -109,85 +97,52 @@ public class RpcRegistry extends BucketStore {
updateLocalBucket(table);
}
- /**
- * Finds routers for the given rpc.
- *
- * @param findRouters the FindRouters request
- */
- private void receiveGetRouter(final FindRouters findRouters) {
- log.debug("receiveGetRouter for {}", findRouters.getRouteIdentifier());
-
- final ActorRef sender = getSender();
- if (!findRouters(findRouters, sender)) {
- log.debug("No routers found for {} - scheduling {} ms timer", findRouters.getRouteIdentifier(),
- findRouterTimeout.toMillis());
-
- final AtomicReference timer = new AtomicReference<>();
- final Runnable routesUpdatedRunnable = new Runnable() {
- @Override
- public void run() {
- if (findRouters(findRouters, sender)) {
- routesUpdatedCallbacks.remove(this);
- timer.get().cancel();
- }
- }
- };
-
- routesUpdatedCallbacks.add(routesUpdatedRunnable);
-
- Runnable timerRunnable = () -> {
- log.warn("Timed out finding routers for {}", findRouters.getRouteIdentifier());
-
- routesUpdatedCallbacks.remove(routesUpdatedRunnable);
- sender.tell(new Messages.FindRoutersReply(
- Collections.>emptyList()), self());
- };
-
- timer.set(getContext().system().scheduler().scheduleOnce(findRouterTimeout, self(), timerRunnable,
- getContext().dispatcher(), self()));
- }
+ @Override
+ protected void onBucketRemoved(final Address address, final Bucket bucket) {
+ rpcRegistrar.tell(new UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())), ActorRef.noSender());
}
- private boolean findRouters(FindRouters findRouters, ActorRef sender) {
- List> routers = new ArrayList<>();
-
- RouteIdentifier, ?, ?> routeId = findRouters.getRouteIdentifier();
- findRoutes(getLocalBucket().getData(), routeId, routers);
+ @Override
+ protected void onBucketsUpdated(final Map> buckets) {
+ final Map> endpoints = new HashMap<>(buckets.size());
+
+ for (Entry> e : buckets.entrySet()) {
+ final RoutingTable table = e.getValue().getData();
+
+ final List rpcs = new ArrayList<>(table.getRoutes().size());
+ for (RouteIdentifier, ?, ?> ri : table.getRoutes()) {
+ if (ri instanceof RouteIdentifierImpl) {
+ final RouteIdentifierImpl id = (RouteIdentifierImpl) ri;
+ rpcs.add(DOMRpcIdentifier.create(SchemaPath.create(true, id.getType()), id.getRoute()));
+ } else {
+ LOG.warn("Skipping unsupported route {} from {}", ri, e.getKey());
+ }
+ }
- for (Bucket bucket : getRemoteBuckets().values()) {
- findRoutes(bucket.getData(), routeId, routers);
+ endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty()
+ : Optional.of(new RemoteRpcEndpoint(table.getRouter(), rpcs)));
}
- log.debug("Found {} routers for {}", routers.size(), findRouters.getRouteIdentifier());
-
- boolean foundRouters = !routers.isEmpty();
- if (foundRouters) {
- sender.tell(new Messages.FindRoutersReply(routers), getSelf());
+ if (!endpoints.isEmpty()) {
+ rpcRegistrar.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
}
-
- return foundRouters;
}
- private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier, ?, ?> routeId,
- List> routers) {
- if (table == null) {
- return;
- }
+ public static final class RemoteRpcEndpoint {
+ private final Set rpcs;
+ private final ActorRef router;
- Option> routerWithUpdateTime = table.getRouterFor(routeId);
- if (!routerWithUpdateTime.isEmpty()) {
- routers.add(routerWithUpdateTime.get());
+ RemoteRpcEndpoint(final ActorRef router, final Collection rpcs) {
+ this.router = Preconditions.checkNotNull(router);
+ this.rpcs = ImmutableSet.copyOf(rpcs);
}
- }
- @Override
- protected void onBucketsUpdated() {
- if (routesUpdatedCallbacks.isEmpty()) {
- return;
+ public ActorRef getRouter() {
+ return router;
}
- for (Runnable callBack: routesUpdatedCallbacks.toArray(new Runnable[routesUpdatedCallbacks.size()])) {
- callBack.run();
+ public Set getRpcs() {
+ return rpcs;
}
}
@@ -195,18 +150,16 @@ public class RpcRegistry extends BucketStore {
* All messages used by the RpcRegistry.
*/
public static class Messages {
-
-
- public static class ContainsRoute {
+ abstract static class AbstractRouteMessage {
final List> routeIdentifiers;
- public ContainsRoute(List> routeIdentifiers) {
+ AbstractRouteMessage(final List> routeIdentifiers) {
Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(),
"Route Identifiers must be supplied");
this.routeIdentifiers = routeIdentifiers;
}
- public List> getRouteIdentifiers() {
+ List> getRouteIdentifiers() {
return this.routeIdentifiers;
}
@@ -216,88 +169,28 @@ public class RpcRegistry extends BucketStore {
}
}
- public static class AddOrUpdateRoutes extends ContainsRoute {
-
- public AddOrUpdateRoutes(List> routeIdentifiers) {
+ public static final class AddOrUpdateRoutes extends AbstractRouteMessage {
+ public AddOrUpdateRoutes(final List> routeIdentifiers) {
super(routeIdentifiers);
}
}
- public static class RemoveRoutes extends ContainsRoute {
-
- public RemoveRoutes(List> routeIdentifiers) {
+ public static final class RemoveRoutes extends AbstractRouteMessage {
+ public RemoveRoutes(final List> routeIdentifiers) {
super(routeIdentifiers);
}
}
- public static class SetLocalRouter {
- private final ActorRef router;
-
- public SetLocalRouter(ActorRef router) {
- Preconditions.checkArgument(router != null, "Router must not be null");
- this.router = router;
- }
-
- public ActorRef getRouter() {
- return this.router;
- }
-
- @Override
- public String toString() {
- return "SetLocalRouter{" + "router=" + router + '}';
- }
- }
-
- public static class FindRouters {
- private final RpcRouter.RouteIdentifier, ?, ?> routeIdentifier;
-
- public FindRouters(RpcRouter.RouteIdentifier, ?, ?> routeIdentifier) {
- Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
- this.routeIdentifier = routeIdentifier;
- }
-
- public RpcRouter.RouteIdentifier, ?, ?> getRouteIdentifier() {
- return routeIdentifier;
- }
-
- @Override
- public String toString() {
- return "FindRouters{" + "routeIdentifier=" + routeIdentifier + '}';
- }
- }
-
- public static class FindRoutersReply {
- final List> routerWithUpdateTime;
+ public static final class UpdateRemoteEndpoints {
+ private final Map> endpoints;
- public FindRoutersReply(List> routerWithUpdateTime) {
- Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
- this.routerWithUpdateTime = routerWithUpdateTime;
+ UpdateRemoteEndpoints(final Map> endpoints) {
+ this.endpoints = ImmutableMap.copyOf(endpoints);
}
- public List> getRouterWithUpdateTime() {
- return routerWithUpdateTime;
+ public Map> getEndpoints() {
+ return endpoints;
}
-
- @Override
- public String toString() {
- return "FindRoutersReply{" + "routerWithUpdateTime=" + routerWithUpdateTime + '}';
- }
- }
- }
-
- private static class RpcRegistryCreator implements Creator {
- private static final long serialVersionUID = 1L;
- private final RemoteRpcProviderConfig config;
-
- private RpcRegistryCreator(RemoteRpcProviderConfig config) {
- this.config = config;
- }
-
- @Override
- public RpcRegistry create() throws Exception {
- RpcRegistry registry = new RpcRegistry(config);
- new RemoteRpcRegistryMXBeanImpl(registry);
- return registry;
}
}
}