Teach sal-remoterpc-connector to route actions
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
index 76f59304576b731f1a15df9c277628ad9ff611d6..68fead4407781f534b0f9b4aee3ae7e2b012f908 100644 (file)
@@ -10,384 +10,179 @@ package org.opendaylight.controller.remote.rpc.registry;
 import akka.actor.ActorRef;
 import akka.actor.Address;
 import akka.actor.Props;
-import akka.actor.UntypedActor;
-import akka.dispatch.Mapper;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import akka.japi.Option;
-import akka.japi.Pair;
-import akka.pattern.Patterns;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
-import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import scala.concurrent.Future;
-
-import java.util.ArrayList;
-import java.util.Collections;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor;
+import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl;
+import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
 
 /**
- * Registry to look up cluster nodes that have registered for a given rpc.
- * <p/>
- * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
+ * Registry to look up cluster nodes that have registered for a given RPC.
+ *
+ * <p>
+ * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor} to maintain this
  * cluster wide information.
  */
-public class RpcRegistry extends UntypedActor {
-
-    final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
-
-    /**
-     * Store to keep the registry. Bucket store sync's it across nodes in the cluster
-     */
-    private ActorRef bucketStore;
+public class RpcRegistry extends BucketStoreActor<RoutingTable> {
+    private final ActorRef rpcRegistrar;
+    private final RemoteRpcRegistryMXBeanImpl mxBean;
+
+    public RpcRegistry(final RemoteOpsProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
+        super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker, ImmutableSet.of()));
+        this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar);
+        this.mxBean = new RemoteRpcRegistryMXBeanImpl(new BucketStoreAccess(self(), getContext().dispatcher(),
+                config.getAskDuration()), config.getAskDuration());
+    }
 
     /**
-     * Rpc broker that would use the registry to route requests.
+     * Create a new props instance for instantiating an RpcRegistry actor.
+     *
+     * @param config Provider configuration
+     * @param rpcRegistrar Local RPC provider interface, used to register routers to remote nodes
+     * @param rpcInvoker Actor handling RPC invocation requests from remote nodes
+     * @return A new {@link Props} instance
      */
-    private ActorRef localRouter;
-
-    public RpcRegistry() {
-        bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store");
-
-        log.info("Bucket store path = {}", bucketStore.path().toString());
+    public static Props props(final RemoteOpsProviderConfig config, final ActorRef rpcInvoker,
+                              final ActorRef rpcRegistrar) {
+        return Props.create(RpcRegistry.class, config, rpcInvoker, rpcRegistrar);
     }
 
-    public RpcRegistry(ActorRef bucketStore) {
-        this.bucketStore = bucketStore;
+    @Override
+    public void postStop() {
+        super.postStop();
+        this.mxBean.unregister();
     }
 
     @Override
-    public void onReceive(Object message) throws Exception {
-
-        log.debug("Received message: message [{}]", message);
-
-        //TODO: if sender is remote, reject message
-
-        if (message instanceof SetLocalRouter)
-            receiveSetLocalRouter((SetLocalRouter) message);
-
-        if (message instanceof AddOrUpdateRoutes)
+    protected void handleCommand(final Object message) throws Exception {
+        if (message instanceof AddOrUpdateRoutes) {
             receiveAddRoutes((AddOrUpdateRoutes) message);
-
-        else if (message instanceof RemoveRoutes)
+        } else if (message instanceof RemoveRoutes) {
             receiveRemoveRoutes((RemoveRoutes) message);
-
-        else if (message instanceof Messages.FindRouters)
-            receiveGetRouter((FindRouters) message);
-
-        else
-            unhandled(message);
-    }
-
-    /**
-     * Register's rpc broker
-     *
-     * @param message contains {@link akka.actor.ActorRef} for rpc broker
-     */
-    private void receiveSetLocalRouter(SetLocalRouter message) {
-        localRouter = message.getRouter();
-    }
-
-    /**
-     * @param msg
-     */
-    private void receiveAddRoutes(AddOrUpdateRoutes msg) {
-
-        Preconditions.checkState(localRouter != null, "Router must be set first");
-
-        Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000);
-        futureReply.map(getMapperToAddRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
+        } else {
+            super.handleCommand(message);
+        }
     }
 
-    /**
-     * @param msg contains list of route ids to remove
-     */
-    private void receiveRemoveRoutes(RemoveRoutes msg) {
-
-        Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000);
-        futureReply.map(getMapperToRemoveRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
-
+    private void receiveAddRoutes(final AddOrUpdateRoutes msg) {
+        LOG.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
+        updateLocalBucket(getLocalData().addRpcs(msg.getRouteIdentifiers()));
     }
 
     /**
-     * Finds routers for the given rpc.
+     * Processes a RemoveRoutes message.
      *
-     * @param msg
+     * @param msg contains list of route ids to remove
      */
-    private void receiveGetRouter(FindRouters msg) {
-        final ActorRef sender = getSender();
-
-        Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), 1000);
-        futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher());
+    private void receiveRemoveRoutes(final RemoveRoutes msg) {
+        LOG.debug("RemoveRoutes: {}", msg.getRouteIdentifiers());
+        updateLocalBucket(getLocalData().removeRpcs(msg.getRouteIdentifiers()));
     }
 
-    /**
-     * Helper to create empty reply when no routers are found
-     *
-     * @return
-     */
-    private Messages.FindRoutersReply createEmptyReply() {
-        List<Pair<ActorRef, Long>> routerWithUpdateTime = Collections.emptyList();
-        return new Messages.FindRoutersReply(routerWithUpdateTime);
+    @Override
+    protected void onBucketRemoved(final Address address, final Bucket<RoutingTable> bucket) {
+        rpcRegistrar.tell(new Messages.UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())),
+                ActorRef.noSender());
     }
 
-    /**
-     * Helper to create a reply when routers are found for the given rpc
-     *
-     * @param buckets
-     * @param routeId
-     * @return
-     */
-    private Messages.FindRoutersReply createReplyWithRouters(Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
-
-        List<Pair<ActorRef, Long>> routers = new ArrayList<>();
-        Option<Pair<ActorRef, Long>> routerWithUpdateTime = null;
-
-        for (Bucket bucket : buckets.values()) {
-
-            RoutingTable table = (RoutingTable) bucket.getData();
-            if (table == null)
-                continue;
+    @Override
+    protected void onBucketsUpdated(final Map<Address, Bucket<RoutingTable>> buckets) {
+        final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = new HashMap<>(buckets.size());
 
-            routerWithUpdateTime = table.getRouterFor(routeId);
-            if (routerWithUpdateTime.isEmpty())
-                continue;
+        for (Entry<Address, Bucket<RoutingTable>> e : buckets.entrySet()) {
+            final RoutingTable table = e.getValue().getData();
 
-            routers.add(routerWithUpdateTime.get());
+            final Collection<DOMRpcIdentifier> rpcs = table.getItems();
+            endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty()
+                    : Optional.of(new RemoteRpcEndpoint(table.getInvoker(), rpcs)));
         }
 
-        return new Messages.FindRoutersReply(routers);
-    }
-
-
-    ///
-    ///private factories to create Mapper
-    ///
-
-    /**
-     * Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found
-     *
-     * @param routeId the rpc
-     * @param sender  client who asked to find the routers.
-     * @return
-     */
-    private Mapper<Object, Void> getMapperToGetRouter(final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
-        return new Mapper<Object, Void>() {
-            @Override
-            public Void apply(Object replyMessage) {
-
-                if (replyMessage instanceof GetAllBucketsReply) {
-
-                    GetAllBucketsReply reply = (GetAllBucketsReply) replyMessage;
-                    Map<Address, Bucket> buckets = reply.getBuckets();
-
-                    if (buckets == null || buckets.isEmpty()) {
-                        sender.tell(createEmptyReply(), getSelf());
-                        return null;
-                    }
-
-                    sender.tell(createReplyWithRouters(buckets, routeId), getSelf());
-                }
-                return null;
-            }
-        };
-    }
-
-    /**
-     * Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently,
-     * it updates the local bucket in bucket store.
-     *
-     * @param routeIds rpc to remote
-     * @return
-     */
-    private Mapper<Object, Void> getMapperToRemoveRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
-        return new Mapper<Object, Void>() {
-            @Override
-            public Void apply(Object replyMessage) {
-                if (replyMessage instanceof GetLocalBucketReply) {
-
-                    GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
-                    Bucket<RoutingTable> bucket = reply.getBucket();
-
-                    if (bucket == null) {
-                        log.debug("Local bucket is null");
-                        return null;
-                    }
-
-                    RoutingTable table = bucket.getData();
-                    if (table == null)
-                        table = new RoutingTable();
-
-                    table.setRouter(localRouter);
-
-                    if (!table.isEmpty()) {
-                        for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
-                            table.removeRoute(routeId);
-                        }
-                    }
-                    bucket.setData(table);
-
-                    UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
-                    bucketStore.tell(updateBucketMessage, getSelf());
-                }
-                return null;
-            }
-        };
+        if (!endpoints.isEmpty()) {
+            rpcRegistrar.tell(new Messages.UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
+        }
     }
 
-    /**
-     * Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently,
-     * it updates the local bucket in bucket store.
-     *
-     * @param routeIds rpc to add
-     * @return
-     */
-    private Mapper<Object, Void> getMapperToAddRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
-
-        return new Mapper<Object, Void>() {
-            @Override
-            public Void apply(Object replyMessage) {
-                if (replyMessage instanceof GetLocalBucketReply) {
-
-                    GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
-                    Bucket<RoutingTable> bucket = reply.getBucket();
-
-                    if (bucket == null) {
-                        log.debug("Local bucket is null");
-                        return null;
-                    }
+    public static final class RemoteRpcEndpoint {
+        private final Set<DOMRpcIdentifier> rpcs;
+        private final ActorRef router;
 
-                    RoutingTable table = bucket.getData();
-                    if (table == null)
-                        table = new RoutingTable();
-
-                    table.setRouter(localRouter);
-                    for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
-                        table.addRoute(routeId);
-                    }
-
-                    bucket.setData(table);
+        @VisibleForTesting
+        public RemoteRpcEndpoint(final ActorRef router, final Collection<DOMRpcIdentifier> rpcs) {
+            this.router = Preconditions.checkNotNull(router);
+            this.rpcs = ImmutableSet.copyOf(rpcs);
+        }
 
-                    UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
-                    bucketStore.tell(updateBucketMessage, getSelf());
-                }
+        public ActorRef getRouter() {
+            return router;
+        }
 
-                return null;
-            }
-        };
+        public Set<DOMRpcIdentifier> getRpcs() {
+            return rpcs;
+        }
     }
 
     /**
-     * All messages used by the RpcRegistry
+     * All messages used by the RpcRegistry.
      */
     public static class Messages {
+        abstract static class AbstractRouteMessage {
+            final List<DOMRpcIdentifier> rpcRouteIdentifiers;
 
-
-        public static class ContainsRoute {
-            final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
-
-            public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
-                Preconditions.checkArgument(routeIdentifiers != null &&
-                                            !routeIdentifiers.isEmpty(),
-                                            "Route Identifiers must be supplied");
-                this.routeIdentifiers = routeIdentifiers;
+            AbstractRouteMessage(final Collection<DOMRpcIdentifier> rpcRouteIdentifiers) {
+                Preconditions.checkArgument(rpcRouteIdentifiers != null && !rpcRouteIdentifiers.isEmpty(),
+                        "Route Identifiers must be supplied");
+                this.rpcRouteIdentifiers = ImmutableList.copyOf(rpcRouteIdentifiers);
             }
 
-            public List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
-                return this.routeIdentifiers;
+            List<DOMRpcIdentifier> getRouteIdentifiers() {
+                return this.rpcRouteIdentifiers;
             }
 
             @Override
             public String toString() {
-                return "ContainsRoute{" +
-                        "routeIdentifiers=" + routeIdentifiers +
-                        '}';
+                return "ContainsRoute{" + "routeIdentifiers=" + rpcRouteIdentifiers + '}';
             }
         }
 
-        public static class AddOrUpdateRoutes extends ContainsRoute {
-
-            public AddOrUpdateRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
-                super(routeIdentifiers);
+        public static final class AddOrUpdateRoutes extends Messages.AbstractRouteMessage {
+            public AddOrUpdateRoutes(final Collection<DOMRpcIdentifier> rpcRouteIdentifiers) {
+                super(rpcRouteIdentifiers);
             }
-        }
-
-        public static class RemoveRoutes extends ContainsRoute {
 
-            public RemoveRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
-                super(routeIdentifiers);
-            }
         }
 
-        public static class SetLocalRouter {
-            private final ActorRef router;
-
-            public SetLocalRouter(ActorRef router) {
-                Preconditions.checkArgument(router != null, "Router must not be null");
-                this.router = router;
-            }
-
-            public ActorRef getRouter() {
-                return this.router;
-            }
-
-            @Override
-            public String toString() {
-                return "SetLocalRouter{" +
-                        "router=" + router +
-                        '}';
-            }
-        }
-
-        public static class FindRouters {
-            private final RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier;
-
-            public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
-                Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
-                this.routeIdentifier = routeIdentifier;
-            }
-
-            public RpcRouter.RouteIdentifier<?, ?, ?> getRouteIdentifier() {
-                return routeIdentifier;
-            }
-
-            @Override
-            public String toString() {
-                return "FindRouters{" +
-                        "routeIdentifier=" + routeIdentifier +
-                        '}';
+        public static final class RemoveRoutes extends AbstractRouteMessage {
+            public RemoveRoutes(final Collection<DOMRpcIdentifier> rpcRouteIdentifiers) {
+                super(rpcRouteIdentifiers);
             }
         }
 
-        public static class FindRoutersReply {
-            final List<Pair<ActorRef, Long>> routerWithUpdateTime;
+        public static final class UpdateRemoteEndpoints {
+            private final Map<Address, Optional<RemoteRpcEndpoint>> rpcEndpoints;
 
-            public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
-                Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
-                this.routerWithUpdateTime = routerWithUpdateTime;
-            }
 
-            public List<Pair<ActorRef, Long>> getRouterWithUpdateTime() {
-                return routerWithUpdateTime;
+            @VisibleForTesting
+            public UpdateRemoteEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> rpcEndpoints) {
+                this.rpcEndpoints = ImmutableMap.copyOf(rpcEndpoints);
             }
 
-            @Override
-            public String toString() {
-                return "FindRoutersReply{" +
-                        "routerWithUpdateTime=" + routerWithUpdateTime +
-                        '}';
+            public Map<Address, Optional<RemoteRpcEndpoint>> getRpcEndpoints() {
+                return rpcEndpoints;
             }
         }
     }