Teach sal-remoterpc-connector to route actions
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
index 845c1c819a70ca6bac0fb1a717b31f7861b6a6b6..68fead4407781f534b0f9b4aee3ae7e2b012f908 100644 (file)
 package org.opendaylight.controller.remote.rpc.registry;
 
 import akka.actor.ActorRef;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import akka.japi.Option;
-import akka.japi.Pair;
+import akka.actor.Address;
+import akka.actor.Props;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import java.util.ArrayList;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
-import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor;
+import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl;
+import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
 
 /**
- * Registry to look up cluster nodes that have registered for a given rpc.
- * <p/>
- * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
+ * Registry to look up cluster nodes that have registered for a given RPC.
+ *
+ * <p>
+ * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor} to maintain this
  * cluster wide information.
  */
-public class RpcRegistry extends BucketStore<RoutingTable> {
-
-    final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+public class RpcRegistry extends BucketStoreActor<RoutingTable> {
+    private final ActorRef rpcRegistrar;
+    private final RemoteRpcRegistryMXBeanImpl mxBean;
+
+    public RpcRegistry(final RemoteOpsProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
+        super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker, ImmutableSet.of()));
+        this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar);
+        this.mxBean = new RemoteRpcRegistryMXBeanImpl(new BucketStoreAccess(self(), getContext().dispatcher(),
+                config.getAskDuration()), config.getAskDuration());
+    }
 
-    public RpcRegistry() {
-        getLocalBucket().setData(new RoutingTable());
+    /**
+     * Create a new props instance for instantiating an RpcRegistry actor.
+     *
+     * @param config Provider configuration
+     * @param rpcRegistrar Local RPC provider interface, used to register routers to remote nodes
+     * @param rpcInvoker Actor handling RPC invocation requests from remote nodes
+     * @return A new {@link Props} instance
+     */
+    public static Props props(final RemoteOpsProviderConfig config, final ActorRef rpcInvoker,
+                              final ActorRef rpcRegistrar) {
+        return Props.create(RpcRegistry.class, config, rpcInvoker, rpcRegistrar);
     }
 
     @Override
-    protected void handleReceive(Object message) throws Exception {
-        //TODO: if sender is remote, reject message
+    public void postStop() {
+        super.postStop();
+        this.mxBean.unregister();
+    }
 
-        if (message instanceof SetLocalRouter) {
-            receiveSetLocalRouter((SetLocalRouter) message);
-        } else if (message instanceof AddOrUpdateRoutes) {
+    @Override
+    protected void handleCommand(final Object message) throws Exception {
+        if (message instanceof AddOrUpdateRoutes) {
             receiveAddRoutes((AddOrUpdateRoutes) message);
         } else if (message instanceof RemoveRoutes) {
             receiveRemoveRoutes((RemoveRoutes) message);
-        } else if (message instanceof Messages.FindRouters) {
-            receiveGetRouter((FindRouters) message);
         } else {
-            super.handleReceive(message);
+            super.handleCommand(message);
         }
     }
 
-    /**
-     * Register's rpc broker
-     *
-     * @param message contains {@link akka.actor.ActorRef} for rpc broker
-     */
-    private void receiveSetLocalRouter(SetLocalRouter message) {
-        getLocalBucket().getData().setRouter(message.getRouter());
+    private void receiveAddRoutes(final AddOrUpdateRoutes msg) {
+        LOG.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
+        updateLocalBucket(getLocalData().addRpcs(msg.getRouteIdentifiers()));
     }
 
     /**
-     * @param msg
+     * Processes a RemoveRoutes message.
+     *
+     * @param msg contains list of route ids to remove
      */
-    private void receiveAddRoutes(AddOrUpdateRoutes msg) {
-
-        log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
-
-        RoutingTable table = getLocalBucket().getData().copy();
-        for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
-            table.addRoute(routeId);
-        }
+    private void receiveRemoveRoutes(final RemoveRoutes msg) {
+        LOG.debug("RemoveRoutes: {}", msg.getRouteIdentifiers());
+        updateLocalBucket(getLocalData().removeRpcs(msg.getRouteIdentifiers()));
+    }
 
-        updateLocalBucket(table);
+    @Override
+    protected void onBucketRemoved(final Address address, final Bucket<RoutingTable> bucket) {
+        rpcRegistrar.tell(new Messages.UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())),
+                ActorRef.noSender());
     }
 
-    /**
-     * @param msg contains list of route ids to remove
-     */
-    private void receiveRemoveRoutes(RemoveRoutes msg) {
+    @Override
+    protected void onBucketsUpdated(final Map<Address, Bucket<RoutingTable>> buckets) {
+        final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = new HashMap<>(buckets.size());
+
+        for (Entry<Address, Bucket<RoutingTable>> e : buckets.entrySet()) {
+            final RoutingTable table = e.getValue().getData();
 
-        RoutingTable table = getLocalBucket().getData().copy();
-        for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
-            table.removeRoute(routeId);
+            final Collection<DOMRpcIdentifier> rpcs = table.getItems();
+            endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty()
+                    : Optional.of(new RemoteRpcEndpoint(table.getInvoker(), rpcs)));
         }
 
-        updateLocalBucket(table);
+        if (!endpoints.isEmpty()) {
+            rpcRegistrar.tell(new Messages.UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
+        }
     }
 
-    /**
-     * Finds routers for the given rpc.
-     *
-     * @param msg
-     */
-    private void receiveGetRouter(FindRouters msg) {
-        List<Pair<ActorRef, Long>> routers = new ArrayList<>();
-
-        RouteIdentifier<?, ?, ?> routeId = msg.getRouteIdentifier();
-        findRoutes(getLocalBucket().getData(), routeId, routers);
+    public static final class RemoteRpcEndpoint {
+        private final Set<DOMRpcIdentifier> rpcs;
+        private final ActorRef router;
 
-        for(Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
-            findRoutes(bucket.getData(), routeId, routers);
+        @VisibleForTesting
+        public RemoteRpcEndpoint(final ActorRef router, final Collection<DOMRpcIdentifier> rpcs) {
+            this.router = Preconditions.checkNotNull(router);
+            this.rpcs = ImmutableSet.copyOf(rpcs);
         }
 
-        getSender().tell(new Messages.FindRoutersReply(routers), getSelf());
-    }
-
-    private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier<?, ?, ?> routeId,
-            List<Pair<ActorRef, Long>> routers) {
-        if (table == null) {
-            return;
+        public ActorRef getRouter() {
+            return router;
         }
 
-        Option<Pair<ActorRef, Long>> routerWithUpdateTime = table.getRouterFor(routeId);
-        if(!routerWithUpdateTime.isEmpty()) {
-            routers.add(routerWithUpdateTime.get());
+        public Set<DOMRpcIdentifier> getRpcs() {
+            return rpcs;
         }
     }
 
     /**
-     * All messages used by the RpcRegistry
+     * All messages used by the RpcRegistry.
      */
     public static class Messages {
+        abstract static class AbstractRouteMessage {
+            final List<DOMRpcIdentifier> rpcRouteIdentifiers;
 
-
-        public static class ContainsRoute {
-            final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
-
-            public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
-                Preconditions.checkArgument(routeIdentifiers != null &&
-                                            !routeIdentifiers.isEmpty(),
-                                            "Route Identifiers must be supplied");
-                this.routeIdentifiers = routeIdentifiers;
+            AbstractRouteMessage(final Collection<DOMRpcIdentifier> rpcRouteIdentifiers) {
+                Preconditions.checkArgument(rpcRouteIdentifiers != null && !rpcRouteIdentifiers.isEmpty(),
+                        "Route Identifiers must be supplied");
+                this.rpcRouteIdentifiers = ImmutableList.copyOf(rpcRouteIdentifiers);
             }
 
-            public List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
-                return this.routeIdentifiers;
+            List<DOMRpcIdentifier> getRouteIdentifiers() {
+                return this.rpcRouteIdentifiers;
             }
 
             @Override
             public String toString() {
-                return "ContainsRoute{" +
-                        "routeIdentifiers=" + routeIdentifiers +
-                        '}';
-            }
-        }
-
-        public static class AddOrUpdateRoutes extends ContainsRoute {
-
-            public AddOrUpdateRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
-                super(routeIdentifiers);
+                return "ContainsRoute{" + "routeIdentifiers=" + rpcRouteIdentifiers + '}';
             }
         }
 
-        public static class RemoveRoutes extends ContainsRoute {
-
-            public RemoveRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
-                super(routeIdentifiers);
-            }
-        }
-
-        public static class SetLocalRouter {
-            private final ActorRef router;
-
-            public SetLocalRouter(ActorRef router) {
-                Preconditions.checkArgument(router != null, "Router must not be null");
-                this.router = router;
-            }
-
-            public ActorRef getRouter() {
-                return this.router;
+        public static final class AddOrUpdateRoutes extends Messages.AbstractRouteMessage {
+            public AddOrUpdateRoutes(final Collection<DOMRpcIdentifier> rpcRouteIdentifiers) {
+                super(rpcRouteIdentifiers);
             }
 
-            @Override
-            public String toString() {
-                return "SetLocalRouter{" +
-                        "router=" + router +
-                        '}';
-            }
         }
 
-        public static class FindRouters {
-            private final RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier;
-
-            public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
-                Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
-                this.routeIdentifier = routeIdentifier;
-            }
-
-            public RpcRouter.RouteIdentifier<?, ?, ?> getRouteIdentifier() {
-                return routeIdentifier;
-            }
-
-            @Override
-            public String toString() {
-                return "FindRouters{" +
-                        "routeIdentifier=" + routeIdentifier +
-                        '}';
+        public static final class RemoveRoutes extends AbstractRouteMessage {
+            public RemoveRoutes(final Collection<DOMRpcIdentifier> rpcRouteIdentifiers) {
+                super(rpcRouteIdentifiers);
             }
         }
 
-        public static class FindRoutersReply {
-            final List<Pair<ActorRef, Long>> routerWithUpdateTime;
+        public static final class UpdateRemoteEndpoints {
+            private final Map<Address, Optional<RemoteRpcEndpoint>> rpcEndpoints;
 
-            public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
-                Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
-                this.routerWithUpdateTime = routerWithUpdateTime;
-            }
 
-            public List<Pair<ActorRef, Long>> getRouterWithUpdateTime() {
-                return routerWithUpdateTime;
+            @VisibleForTesting
+            public UpdateRemoteEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> rpcEndpoints) {
+                this.rpcEndpoints = ImmutableMap.copyOf(rpcEndpoints);
             }
 
-            @Override
-            public String toString() {
-                return "FindRoutersReply{" +
-                        "routerWithUpdateTime=" + routerWithUpdateTime +
-                        '}';
+            public Map<Address, Optional<RemoteRpcEndpoint>> getRpcEndpoints() {
+                return rpcEndpoints;
             }
         }
     }