Remove unneded RoutingTable time tracking
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
index 219646d8478ade824d22589842c4d4ddf1edccaa..c8415cc818733ff67bcc9828eaf96eceac916f7c 100644 (file)
 package org.opendaylight.controller.remote.rpc.registry;
 
 import akka.actor.ActorRef;
-import akka.japi.Option;
-import akka.japi.Pair;
+import akka.actor.Address;
+import akka.actor.Props;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 
 /**
- * Registry to look up cluster nodes that have registered for a given rpc.
- * <p/>
+ * Registry to look up cluster nodes that have registered for a given RPC.
+ *
+ * <p>
  * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
  * cluster wide information.
  */
 public class RpcRegistry extends BucketStore<RoutingTable> {
+    private final ActorRef rpcRegistrar;
 
-    public RpcRegistry() {
-        getLocalBucket().setData(new RoutingTable());
+    public RpcRegistry(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
+        super(config, new RoutingTable(rpcInvoker));
+        this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar);
     }
 
-    @Override
-    protected void handleReceive(Object message) throws Exception {
-        //TODO: if sender is remote, reject message
+    /**
+     * Create a new props instance for instantiating an RpcRegistry actor.
+     *
+     * @param config Provider configuration
+     * @param rpcRegistrar Local RPC provider interface, used to register routers to remote nodes
+     * @param rpcInvoker Actor handling RPC invocation requests from remote nodes
+     * @return A new {@link Props} instance
+     */
+    public static Props props(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker,
+            final ActorRef rpcRegistrar) {
+        return Props.create(RpcRegistry.class, config, rpcInvoker, rpcRegistrar);
+    }
 
-        if (message instanceof SetLocalRouter) {
-            receiveSetLocalRouter((SetLocalRouter) message);
-        } else if (message instanceof AddOrUpdateRoutes) {
+    @Override
+    protected void handleReceive(final Object message) throws Exception {
+        if (message instanceof AddOrUpdateRoutes) {
             receiveAddRoutes((AddOrUpdateRoutes) message);
         } else if (message instanceof RemoveRoutes) {
             receiveRemoveRoutes((RemoveRoutes) message);
-        } else if (message instanceof Messages.FindRouters) {
-            receiveGetRouter((FindRouters) message);
         } else {
             super.handleReceive(message);
         }
     }
 
-    /**
-     * Register's rpc broker
-     *
-     * @param message contains {@link akka.actor.ActorRef} for rpc broker
-     */
-    private void receiveSetLocalRouter(SetLocalRouter message) {
-        getLocalBucket().getData().setRouter(message.getRouter());
+    private void receiveAddRoutes(final AddOrUpdateRoutes msg) {
+        LOG.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
+        updateLocalBucket(getLocalBucket().getData().addRpcs(msg.getRouteIdentifiers()));
     }
 
     /**
-     * @param msg
+     * Processes a RemoveRoutes message.
+     *
+     * @param msg contains list of route ids to remove
      */
-    private void receiveAddRoutes(AddOrUpdateRoutes msg) {
-
-        log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
-
-        RoutingTable table = getLocalBucket().getData().copy();
-        for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
-            table.addRoute(routeId);
-        }
+    private void receiveRemoveRoutes(final RemoveRoutes msg) {
+        LOG.debug("RemoveRoutes: {}", msg.getRouteIdentifiers());
+        updateLocalBucket(getLocalBucket().getData().removeRpcs(msg.getRouteIdentifiers()));
+    }
 
-        updateLocalBucket(table);
+    @Override
+    protected void onBucketRemoved(final Address address, final Bucket<RoutingTable> bucket) {
+        rpcRegistrar.tell(new UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())), ActorRef.noSender());
     }
 
-    /**
-     * @param msg contains list of route ids to remove
-     */
-    private void receiveRemoveRoutes(RemoveRoutes msg) {
+    @Override
+    protected void onBucketsUpdated(final Map<Address, Bucket<RoutingTable>> buckets) {
+        final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = new HashMap<>(buckets.size());
+
+        for (Entry<Address, Bucket<RoutingTable>> e : buckets.entrySet()) {
+            final RoutingTable table = e.getValue().getData();
+
+            final List<DOMRpcIdentifier> rpcs = new ArrayList<>(table.getRoutes().size());
+            for (RouteIdentifier<?, ?, ?> ri : table.getRoutes()) {
+                if (ri instanceof RouteIdentifierImpl) {
+                    final RouteIdentifierImpl id = (RouteIdentifierImpl) ri;
+                    rpcs.add(DOMRpcIdentifier.create(SchemaPath.create(true, id.getType()), id.getRoute()));
+                } else {
+                    LOG.warn("Skipping unsupported route {} from {}", ri, e.getKey());
+                }
+            }
 
-        RoutingTable table = getLocalBucket().getData().copy();
-        for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
-            table.removeRoute(routeId);
+            endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty()
+                    : Optional.of(new RemoteRpcEndpoint(table.getRpcInvoker(), rpcs)));
         }
 
-        updateLocalBucket(table);
+        if (!endpoints.isEmpty()) {
+            rpcRegistrar.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
+        }
     }
 
-    /**
-     * Finds routers for the given rpc.
-     *
-     * @param msg
-     */
-    private void receiveGetRouter(FindRouters msg) {
-        List<Pair<ActorRef, Long>> routers = new ArrayList<>();
+    public static final class RemoteRpcEndpoint {
+        private final Set<DOMRpcIdentifier> rpcs;
+        private final ActorRef router;
 
-        RouteIdentifier<?, ?, ?> routeId = msg.getRouteIdentifier();
-        findRoutes(getLocalBucket().getData(), routeId, routers);
-
-        for(Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
-            findRoutes(bucket.getData(), routeId, routers);
+        RemoteRpcEndpoint(final ActorRef router, final Collection<DOMRpcIdentifier> rpcs) {
+            this.router = Preconditions.checkNotNull(router);
+            this.rpcs = ImmutableSet.copyOf(rpcs);
         }
 
-        getSender().tell(new Messages.FindRoutersReply(routers), getSelf());
-    }
-
-    private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier<?, ?, ?> routeId,
-            List<Pair<ActorRef, Long>> routers) {
-        if (table == null) {
-            return;
+        public ActorRef getRouter() {
+            return router;
         }
 
-        Option<Pair<ActorRef, Long>> routerWithUpdateTime = table.getRouterFor(routeId);
-        if(!routerWithUpdateTime.isEmpty()) {
-            routers.add(routerWithUpdateTime.get());
+        public Set<DOMRpcIdentifier> getRpcs() {
+            return rpcs;
         }
     }
 
     /**
-     * All messages used by the RpcRegistry
+     * All messages used by the RpcRegistry.
      */
     public static class Messages {
+        abstract static class AbstractRouteMessage {
+            final List<RouteIdentifier<?, ?, ?>> routeIdentifiers;
 
-
-        public static class ContainsRoute {
-            final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
-
-            public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
-                Preconditions.checkArgument(routeIdentifiers != null &&
-                                            !routeIdentifiers.isEmpty(),
-                                            "Route Identifiers must be supplied");
+            AbstractRouteMessage(final List<RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+                Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(),
+                        "Route Identifiers must be supplied");
                 this.routeIdentifiers = routeIdentifiers;
             }
 
-            public List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
+            List<RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
                 return this.routeIdentifiers;
             }
 
             @Override
             public String toString() {
-                return "ContainsRoute{" +
-                        "routeIdentifiers=" + routeIdentifiers +
-                        '}';
+                return "ContainsRoute{" + "routeIdentifiers=" + routeIdentifiers + '}';
             }
         }
 
-        public static class AddOrUpdateRoutes extends ContainsRoute {
-
-            public AddOrUpdateRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+        public static final class AddOrUpdateRoutes extends AbstractRouteMessage {
+            public AddOrUpdateRoutes(final List<RouteIdentifier<?, ?, ?>> routeIdentifiers) {
                 super(routeIdentifiers);
             }
         }
 
-        public static class RemoveRoutes extends ContainsRoute {
-
-            public RemoveRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+        public static final class RemoveRoutes extends AbstractRouteMessage {
+            public RemoveRoutes(final List<RouteIdentifier<?, ?, ?>> routeIdentifiers) {
                 super(routeIdentifiers);
             }
         }
 
-        public static class SetLocalRouter {
-            private final ActorRef router;
-
-            public SetLocalRouter(ActorRef router) {
-                Preconditions.checkArgument(router != null, "Router must not be null");
-                this.router = router;
-            }
-
-            public ActorRef getRouter() {
-                return this.router;
-            }
-
-            @Override
-            public String toString() {
-                return "SetLocalRouter{" +
-                        "router=" + router +
-                        '}';
-            }
-        }
-
-        public static class FindRouters {
-            private final RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier;
+        public static final class UpdateRemoteEndpoints {
+            private final Map<Address, Optional<RemoteRpcEndpoint>> endpoints;
 
-            public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
-                Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
-                this.routeIdentifier = routeIdentifier;
+            UpdateRemoteEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> endpoints) {
+                this.endpoints = ImmutableMap.copyOf(endpoints);
             }
 
-            public RpcRouter.RouteIdentifier<?, ?, ?> getRouteIdentifier() {
-                return routeIdentifier;
-            }
-
-            @Override
-            public String toString() {
-                return "FindRouters{" +
-                        "routeIdentifier=" + routeIdentifier +
-                        '}';
-            }
-        }
-
-        public static class FindRoutersReply {
-            final List<Pair<ActorRef, Long>> routerWithUpdateTime;
-
-            public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
-                Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
-                this.routerWithUpdateTime = routerWithUpdateTime;
-            }
-
-            public List<Pair<ActorRef, Long>> getRouterWithUpdateTime() {
-                return routerWithUpdateTime;
-            }
-
-            @Override
-            public String toString() {
-                return "FindRoutersReply{" +
-                        "routerWithUpdateTime=" + routerWithUpdateTime +
-                        '}';
+            public Map<Address, Optional<RemoteRpcEndpoint>> getEndpoints() {
+                return endpoints;
             }
         }
     }