X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2FRpcRegistry.java;h=8d66ed8ccb163abc56891009bfe09cfdae29add6;hb=HEAD;hp=219646d8478ade824d22589842c4d4ddf1edccaa;hpb=e4c11407593914ed4520253909d0d7669e51cfac;p=controller.git
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java
index 219646d847..8d66ed8ccb 100644
--- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java
+++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java
@@ -7,216 +7,193 @@
*/
package org.opendaylight.controller.remote.rpc.registry;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
import akka.actor.ActorRef;
-import akka.japi.Option;
-import akka.japi.Pair;
-import com.google.common.base.Preconditions;
-import java.util.ArrayList;
+import akka.actor.Address;
+import akka.actor.Props;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
-import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor;
+import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl;
+import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
/**
- * Registry to look up cluster nodes that have registered for a given rpc.
- *
- * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
+ * Registry to look up cluster nodes that have registered for a given RPC.
+ *
+ *
+ * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor} to maintain this
* cluster wide information.
*/
-public class RpcRegistry extends BucketStore {
+public class RpcRegistry extends BucketStoreActor {
+ private final ActorRef rpcRegistrar;
+ private RemoteRpcRegistryMXBeanImpl mxBean;
- public RpcRegistry() {
- getLocalBucket().setData(new RoutingTable());
- }
+ public RpcRegistry(final RemoteOpsProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
+ super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker, ImmutableSet.of()));
+ this.rpcRegistrar = requireNonNull(rpcRegistrar);
- @Override
- protected void handleReceive(Object message) throws Exception {
- //TODO: if sender is remote, reject message
-
- if (message instanceof SetLocalRouter) {
- receiveSetLocalRouter((SetLocalRouter) message);
- } else if (message instanceof AddOrUpdateRoutes) {
- receiveAddRoutes((AddOrUpdateRoutes) message);
- } else if (message instanceof RemoveRoutes) {
- receiveRemoveRoutes((RemoveRoutes) message);
- } else if (message instanceof Messages.FindRouters) {
- receiveGetRouter((FindRouters) message);
- } else {
- super.handleReceive(message);
- }
}
/**
- * Register's rpc broker
+ * Create a new props instance for instantiating an RpcRegistry actor.
*
- * @param message contains {@link akka.actor.ActorRef} for rpc broker
+ * @param config Provider configuration
+ * @param rpcRegistrar Local RPC provider interface, used to register routers to remote nodes
+ * @param rpcInvoker Actor handling RPC invocation requests from remote nodes
+ * @return A new {@link Props} instance
*/
- private void receiveSetLocalRouter(SetLocalRouter message) {
- getLocalBucket().getData().setRouter(message.getRouter());
+ public static Props props(final RemoteOpsProviderConfig config, final ActorRef rpcInvoker,
+ final ActorRef rpcRegistrar) {
+ return Props.create(RpcRegistry.class, config, rpcInvoker, rpcRegistrar);
}
- /**
- * @param msg
- */
- private void receiveAddRoutes(AddOrUpdateRoutes msg) {
-
- log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
+ @Override
+ public void preStart() {
+ super.preStart();
+ mxBean = new RemoteRpcRegistryMXBeanImpl(new BucketStoreAccess(self(), getContext().dispatcher(),
+ getConfig().getAskDuration()), getConfig().getAskDuration());
+ }
- RoutingTable table = getLocalBucket().getData().copy();
- for(RpcRouter.RouteIdentifier, ?, ?> routeId : msg.getRouteIdentifiers()) {
- table.addRoute(routeId);
+ @Override
+ public void postStop() throws Exception {
+ if (mxBean != null) {
+ mxBean.unregister();
+ mxBean = null;
}
-
- updateLocalBucket(table);
+ super.postStop();
}
- /**
- * @param msg contains list of route ids to remove
- */
- private void receiveRemoveRoutes(RemoveRoutes msg) {
-
- RoutingTable table = getLocalBucket().getData().copy();
- for (RpcRouter.RouteIdentifier, ?, ?> routeId : msg.getRouteIdentifiers()) {
- table.removeRoute(routeId);
+ @Override
+ protected void handleCommand(final Object message) throws Exception {
+ if (message instanceof AddOrUpdateRoutes addRoutes) {
+ receiveAddRoutes(addRoutes);
+ } else if (message instanceof RemoveRoutes removeRoutes) {
+ receiveRemoveRoutes(removeRoutes);
+ } else {
+ super.handleCommand(message);
}
+ }
- updateLocalBucket(table);
+ private void receiveAddRoutes(final AddOrUpdateRoutes msg) {
+ LOG.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
+ updateLocalBucket(getLocalData().addRpcs(msg.getRouteIdentifiers()));
}
/**
- * Finds routers for the given rpc.
+ * Processes a RemoveRoutes message.
*
- * @param msg
+ * @param msg contains list of route ids to remove
*/
- private void receiveGetRouter(FindRouters msg) {
- List> routers = new ArrayList<>();
+ private void receiveRemoveRoutes(final RemoveRoutes msg) {
+ LOG.debug("RemoveRoutes: {}", msg.getRouteIdentifiers());
+ updateLocalBucket(getLocalData().removeRpcs(msg.getRouteIdentifiers()));
+ }
- RouteIdentifier, ?, ?> routeId = msg.getRouteIdentifier();
- findRoutes(getLocalBucket().getData(), routeId, routers);
+ @Override
+ protected void onBucketRemoved(final Address address, final Bucket bucket) {
+ rpcRegistrar.tell(new Messages.UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())),
+ ActorRef.noSender());
+ }
- for(Bucket bucket : getRemoteBuckets().values()) {
- findRoutes(bucket.getData(), routeId, routers);
- }
+ @Override
+ protected void onBucketsUpdated(final Map> buckets) {
+ final Map> endpoints = new HashMap<>(buckets.size());
- getSender().tell(new Messages.FindRoutersReply(routers), getSelf());
- }
+ for (Entry> e : buckets.entrySet()) {
+ final RoutingTable table = e.getValue().getData();
- private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier, ?, ?> routeId,
- List> routers) {
- if (table == null) {
- return;
+ final Collection rpcs = table.getItems();
+ endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty()
+ : Optional.of(new RemoteRpcEndpoint(table.getInvoker(), rpcs)));
}
- Option> routerWithUpdateTime = table.getRouterFor(routeId);
- if(!routerWithUpdateTime.isEmpty()) {
- routers.add(routerWithUpdateTime.get());
+ if (!endpoints.isEmpty()) {
+ rpcRegistrar.tell(new Messages.UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
}
}
- /**
- * All messages used by the RpcRegistry
- */
- public static class Messages {
-
-
- public static class ContainsRoute {
- final List> routeIdentifiers;
-
- public ContainsRoute(List> routeIdentifiers) {
- Preconditions.checkArgument(routeIdentifiers != null &&
- !routeIdentifiers.isEmpty(),
- "Route Identifiers must be supplied");
- this.routeIdentifiers = routeIdentifiers;
- }
-
- public List> getRouteIdentifiers() {
- return this.routeIdentifiers;
- }
+ public static final class RemoteRpcEndpoint {
+ private final Set rpcs;
+ private final ActorRef router;
- @Override
- public String toString() {
- return "ContainsRoute{" +
- "routeIdentifiers=" + routeIdentifiers +
- '}';
- }
+ @VisibleForTesting
+ public RemoteRpcEndpoint(final ActorRef router, final Collection rpcs) {
+ this.router = requireNonNull(router);
+ this.rpcs = ImmutableSet.copyOf(rpcs);
}
- public static class AddOrUpdateRoutes extends ContainsRoute {
-
- public AddOrUpdateRoutes(List> routeIdentifiers) {
- super(routeIdentifiers);
- }
+ public ActorRef getRouter() {
+ return router;
}
- public static class RemoveRoutes extends ContainsRoute {
-
- public RemoveRoutes(List> routeIdentifiers) {
- super(routeIdentifiers);
- }
+ public Set getRpcs() {
+ return rpcs;
}
+ }
- public static class SetLocalRouter {
- private final ActorRef router;
+ /**
+ * All messages used by the RpcRegistry.
+ */
+ public static class Messages {
+ abstract static class AbstractRouteMessage {
+ final List rpcRouteIdentifiers;
- public SetLocalRouter(ActorRef router) {
- Preconditions.checkArgument(router != null, "Router must not be null");
- this.router = router;
+ AbstractRouteMessage(final Collection rpcRouteIdentifiers) {
+ checkArgument(rpcRouteIdentifiers != null && !rpcRouteIdentifiers.isEmpty(),
+ "Route Identifiers must be supplied");
+ this.rpcRouteIdentifiers = ImmutableList.copyOf(rpcRouteIdentifiers);
}
- public ActorRef getRouter() {
- return this.router;
+ List getRouteIdentifiers() {
+ return rpcRouteIdentifiers;
}
@Override
public String toString() {
- return "SetLocalRouter{" +
- "router=" + router +
- '}';
+ return "ContainsRoute{" + "routeIdentifiers=" + rpcRouteIdentifiers + '}';
}
}
- public static class FindRouters {
- private final RpcRouter.RouteIdentifier, ?, ?> routeIdentifier;
-
- public FindRouters(RpcRouter.RouteIdentifier, ?, ?> routeIdentifier) {
- Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
- this.routeIdentifier = routeIdentifier;
+ public static final class AddOrUpdateRoutes extends Messages.AbstractRouteMessage {
+ public AddOrUpdateRoutes(final Collection rpcRouteIdentifiers) {
+ super(rpcRouteIdentifiers);
}
- public RpcRouter.RouteIdentifier, ?, ?> getRouteIdentifier() {
- return routeIdentifier;
- }
+ }
- @Override
- public String toString() {
- return "FindRouters{" +
- "routeIdentifier=" + routeIdentifier +
- '}';
+ public static final class RemoveRoutes extends AbstractRouteMessage {
+ public RemoveRoutes(final Collection rpcRouteIdentifiers) {
+ super(rpcRouteIdentifiers);
}
}
- public static class FindRoutersReply {
- final List> routerWithUpdateTime;
+ public static final class UpdateRemoteEndpoints {
+ private final Map> rpcEndpoints;
- public FindRoutersReply(List> routerWithUpdateTime) {
- Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
- this.routerWithUpdateTime = routerWithUpdateTime;
- }
- public List> getRouterWithUpdateTime() {
- return routerWithUpdateTime;
+ @VisibleForTesting
+ public UpdateRemoteEndpoints(final Map> rpcEndpoints) {
+ this.rpcEndpoints = ImmutableMap.copyOf(rpcEndpoints);
}
- @Override
- public String toString() {
- return "FindRoutersReply{" +
- "routerWithUpdateTime=" + routerWithUpdateTime +
- '}';
+ public Map> getRpcEndpoints() {
+ return rpcEndpoints;
}
}
}