BUG-3128: rework sal-remoterpc-connector
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
index fc5b618663988d304d6b51468d2da9f99250d7b2..1545eb00d2cc9af4bda80881d197bad132f0e667 100644 (file)
@@ -8,80 +8,72 @@
 package org.opendaylight.controller.remote.rpc.registry;
 
 import akka.actor.ActorRef;
-import akka.actor.Cancellable;
+import akka.actor.Address;
 import akka.actor.Props;
-import akka.japi.Option;
-import akka.japi.Pair;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
-import scala.concurrent.duration.FiniteDuration;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 
 /**
- * Registry to look up cluster nodes that have registered for a given rpc.
+ * Registry to look up cluster nodes that have registered for a given RPC.
  *
  * <p>
  * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
  * cluster wide information.
  */
 public class RpcRegistry extends BucketStore<RoutingTable> {
-    private final Set<Runnable> routesUpdatedCallbacks = new HashSet<>();
-    private final FiniteDuration findRouterTimeout;
+    private final ActorRef rpcRegistrar;
 
-    public RpcRegistry(RemoteRpcProviderConfig config) {
-        super(config);
-        getLocalBucket().setData(new RoutingTable());
-        findRouterTimeout = getConfig().getGossipTickInterval().$times(10);
+    public RpcRegistry(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
+        super(config, new RoutingTable(rpcInvoker));
+        this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar);
     }
 
-    public static Props props(RemoteRpcProviderConfig config) {
-        return Props.create(RpcRegistry.class, config);
+    /**
+     * Create a new props instance for instantiating an RpcRegistry actor.
+     *
+     * @param config Provider configuration
+     * @param rpcRegistrar Local RPC provider interface, used to register routers to remote nodes
+     * @param rpcInvoker Actor handling RPC invocation requests from remote nodes
+     * @return A new {@link Props} instance
+     */
+    public static Props props(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker,
+            final ActorRef rpcRegistrar) {
+        return Props.create(RpcRegistry.class, config, rpcInvoker, rpcRegistrar);
     }
 
     @Override
-    protected void handleReceive(Object message) throws Exception {
-        //TODO: if sender is remote, reject message
-
-        if (message instanceof SetLocalRouter) {
-            receiveSetLocalRouter((SetLocalRouter) message);
-        } else if (message instanceof AddOrUpdateRoutes) {
+    protected void handleReceive(final Object message) throws Exception {
+        if (message instanceof AddOrUpdateRoutes) {
             receiveAddRoutes((AddOrUpdateRoutes) message);
         } else if (message instanceof RemoveRoutes) {
             receiveRemoveRoutes((RemoveRoutes) message);
-        } else if (message instanceof Messages.FindRouters) {
-            receiveGetRouter((FindRouters) message);
-        } else if (message instanceof Runnable) {
-            ((Runnable)message).run();
         } else {
             super.handleReceive(message);
         }
     }
 
-    /**
-     * Registers a rpc broker.
-     *
-     * @param message contains {@link akka.actor.ActorRef} for rpc broker
-     */
-    private void receiveSetLocalRouter(SetLocalRouter message) {
-        getLocalBucket().getData().setRouter(message.getRouter());
-    }
-
-    private void receiveAddRoutes(AddOrUpdateRoutes msg) {
-
-        log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
+    private void receiveAddRoutes(final AddOrUpdateRoutes msg) {
+        LOG.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
 
         RoutingTable table = getLocalBucket().getData().copy();
         for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
@@ -89,8 +81,6 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
         }
 
         updateLocalBucket(table);
-
-        onBucketsUpdated();
     }
 
     /**
@@ -98,8 +88,7 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
      *
      * @param msg contains list of route ids to remove
      */
-    private void receiveRemoveRoutes(RemoveRoutes msg) {
-
+    private void receiveRemoveRoutes(final RemoveRoutes msg) {
         RoutingTable table = getLocalBucket().getData().copy();
         for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
             table.removeRoute(routeId);
@@ -108,85 +97,52 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
         updateLocalBucket(table);
     }
 
-    /**
-     * Finds routers for the given rpc.
-     *
-     * @param findRouters the FindRouters request
-     */
-    private void receiveGetRouter(final FindRouters findRouters) {
-        log.debug("receiveGetRouter for {}", findRouters.getRouteIdentifier());
-
-        final ActorRef sender = getSender();
-        if (!findRouters(findRouters, sender)) {
-            log.debug("No routers found for {} - scheduling {} ms timer", findRouters.getRouteIdentifier(),
-                    findRouterTimeout.toMillis());
-
-            final AtomicReference<Cancellable> timer = new AtomicReference<>();
-            final Runnable routesUpdatedRunnable = new Runnable() {
-                @Override
-                public void run() {
-                    if (findRouters(findRouters, sender)) {
-                        routesUpdatedCallbacks.remove(this);
-                        timer.get().cancel();
-                    }
-                }
-            };
-
-            routesUpdatedCallbacks.add(routesUpdatedRunnable);
-
-            Runnable timerRunnable = () -> {
-                log.warn("Timed out finding routers for {}", findRouters.getRouteIdentifier());
-
-                routesUpdatedCallbacks.remove(routesUpdatedRunnable);
-                sender.tell(new Messages.FindRoutersReply(
-                        Collections.<Pair<ActorRef, Long>>emptyList()), self());
-            };
-
-            timer.set(getContext().system().scheduler().scheduleOnce(findRouterTimeout, self(), timerRunnable,
-                    getContext().dispatcher(), self()));
-        }
+    @Override
+    protected void onBucketRemoved(final Address address, final Bucket<RoutingTable> bucket) {
+        rpcRegistrar.tell(new UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())), ActorRef.noSender());
     }
 
-    private boolean findRouters(FindRouters findRouters, ActorRef sender) {
-        List<Pair<ActorRef, Long>> routers = new ArrayList<>();
-
-        RouteIdentifier<?, ?, ?> routeId = findRouters.getRouteIdentifier();
-        findRoutes(getLocalBucket().getData(), routeId, routers);
+    @Override
+    protected void onBucketsUpdated(final Map<Address, Bucket<RoutingTable>> buckets) {
+        final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = new HashMap<>(buckets.size());
+
+        for (Entry<Address, Bucket<RoutingTable>> e : buckets.entrySet()) {
+            final RoutingTable table = e.getValue().getData();
+
+            final List<DOMRpcIdentifier> rpcs = new ArrayList<>(table.getRoutes().size());
+            for (RouteIdentifier<?, ?, ?> ri : table.getRoutes()) {
+                if (ri instanceof RouteIdentifierImpl) {
+                    final RouteIdentifierImpl id = (RouteIdentifierImpl) ri;
+                    rpcs.add(DOMRpcIdentifier.create(SchemaPath.create(true, id.getType()), id.getRoute()));
+                } else {
+                    LOG.warn("Skipping unsupported route {} from {}", ri, e.getKey());
+                }
+            }
 
-        for (Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
-            findRoutes(bucket.getData(), routeId, routers);
+            endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty()
+                    : Optional.of(new RemoteRpcEndpoint(table.getRouter(), rpcs)));
         }
 
-        log.debug("Found {} routers for {}", routers.size(), findRouters.getRouteIdentifier());
-
-        boolean foundRouters = !routers.isEmpty();
-        if (foundRouters) {
-            sender.tell(new Messages.FindRoutersReply(routers), getSelf());
+        if (!endpoints.isEmpty()) {
+            rpcRegistrar.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
         }
-
-        return foundRouters;
     }
 
-    private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier<?, ?, ?> routeId,
-            List<Pair<ActorRef, Long>> routers) {
-        if (table == null) {
-            return;
-        }
+    public static final class RemoteRpcEndpoint {
+        private final Set<DOMRpcIdentifier> rpcs;
+        private final ActorRef router;
 
-        Option<Pair<ActorRef, Long>> routerWithUpdateTime = table.getRouterFor(routeId);
-        if (!routerWithUpdateTime.isEmpty()) {
-            routers.add(routerWithUpdateTime.get());
+        RemoteRpcEndpoint(final ActorRef router, final Collection<DOMRpcIdentifier> rpcs) {
+            this.router = Preconditions.checkNotNull(router);
+            this.rpcs = ImmutableSet.copyOf(rpcs);
         }
-    }
 
-    @Override
-    protected void onBucketsUpdated() {
-        if (routesUpdatedCallbacks.isEmpty()) {
-            return;
+        public ActorRef getRouter() {
+            return router;
         }
 
-        for (Runnable callBack: routesUpdatedCallbacks.toArray(new Runnable[routesUpdatedCallbacks.size()])) {
-            callBack.run();
+        public Set<DOMRpcIdentifier> getRpcs() {
+            return rpcs;
         }
     }
 
@@ -194,18 +150,16 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
      * All messages used by the RpcRegistry.
      */
     public static class Messages {
-
-
-        public static class ContainsRoute {
+        abstract static class AbstractRouteMessage {
             final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
 
-            public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+            AbstractRouteMessage(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
                 Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(),
                         "Route Identifiers must be supplied");
                 this.routeIdentifiers = routeIdentifiers;
             }
 
-            public List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
+            List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
                 return this.routeIdentifiers;
             }
 
@@ -215,71 +169,27 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
             }
         }
 
-        public static class AddOrUpdateRoutes extends ContainsRoute {
-
-            public AddOrUpdateRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+        public static final class AddOrUpdateRoutes extends AbstractRouteMessage {
+            public AddOrUpdateRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
                 super(routeIdentifiers);
             }
         }
 
-        public static class RemoveRoutes extends ContainsRoute {
-
-            public RemoveRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+        public static final class RemoveRoutes extends AbstractRouteMessage {
+            public RemoveRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
                 super(routeIdentifiers);
             }
         }
 
-        public static class SetLocalRouter {
-            private final ActorRef router;
-
-            public SetLocalRouter(ActorRef router) {
-                Preconditions.checkArgument(router != null, "Router must not be null");
-                this.router = router;
-            }
-
-            public ActorRef getRouter() {
-                return this.router;
-            }
-
-            @Override
-            public String toString() {
-                return "SetLocalRouter{" + "router=" + router + '}';
-            }
-        }
-
-        public static class FindRouters {
-            private final RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier;
-
-            public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
-                Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
-                this.routeIdentifier = routeIdentifier;
-            }
-
-            public RpcRouter.RouteIdentifier<?, ?, ?> getRouteIdentifier() {
-                return routeIdentifier;
-            }
+        public static final class UpdateRemoteEndpoints {
+            private final Map<Address, Optional<RemoteRpcEndpoint>> endpoints;
 
-            @Override
-            public String toString() {
-                return "FindRouters{" + "routeIdentifier=" + routeIdentifier + '}';
+            UpdateRemoteEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> endpoints) {
+                this.endpoints = ImmutableMap.copyOf(endpoints);
             }
-        }
 
-        public static class FindRoutersReply {
-            final List<Pair<ActorRef, Long>> routerWithUpdateTime;
-
-            public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
-                Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
-                this.routerWithUpdateTime = routerWithUpdateTime;
-            }
-
-            public List<Pair<ActorRef, Long>> getRouterWithUpdateTime() {
-                return routerWithUpdateTime;
-            }
-
-            @Override
-            public String toString() {
-                return "FindRoutersReply{" + "routerWithUpdateTime=" + routerWithUpdateTime + '}';
+            public Map<Address, Optional<RemoteRpcEndpoint>> getEndpoints() {
+                return endpoints;
             }
         }
     }