Register MXBean only during start
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
index 7c5efc29e8640c6028ca829fd022b12aea010400..2c89f1426072a6e945de5ed107fd3b07a53ef7df 100644 (file)
@@ -7,91 +7,92 @@
  */
 package org.opendaylight.controller.remote.rpc.registry;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
 import akka.actor.ActorRef;
-import akka.actor.Cancellable;
+import akka.actor.Address;
 import akka.actor.Props;
-import akka.japi.Creator;
-import akka.japi.Option;
-import akka.japi.Pair;
-import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
-import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor;
 import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
-import scala.concurrent.duration.FiniteDuration;
+import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
 
 /**
- * Registry to look up cluster nodes that have registered for a given rpc.
- * <p/>
- * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
+ * Registry to look up cluster nodes that have registered for a given RPC.
+ *
+ * <p>
+ * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor} to maintain this
  * cluster wide information.
  */
-public class RpcRegistry extends BucketStore<RoutingTable> {
-    private final Set<Runnable> routesUpdatedCallbacks = new HashSet<>();
-    private final FiniteDuration findRouterTimeout;
+public class RpcRegistry extends BucketStoreActor<RoutingTable> {
+    private final ActorRef rpcRegistrar;
+    private RemoteRpcRegistryMXBeanImpl mxBean;
+
+    public RpcRegistry(final RemoteOpsProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
+        super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker, ImmutableSet.of()));
+        this.rpcRegistrar = requireNonNull(rpcRegistrar);
 
-    public RpcRegistry(RemoteRpcProviderConfig config) {
-        super(config);
-        getLocalBucket().setData(new RoutingTable());
-        findRouterTimeout = getConfig().getGossipTickInterval().$times(10);
     }
 
-    public static Props props(RemoteRpcProviderConfig config) {
-        return Props.create(new RpcRegistryCreator(config));
+    /**
+     * Create a new props instance for instantiating an RpcRegistry actor.
+     *
+     * @param config Provider configuration
+     * @param rpcRegistrar Local RPC provider interface, used to register routers to remote nodes
+     * @param rpcInvoker Actor handling RPC invocation requests from remote nodes
+     * @return A new {@link Props} instance
+     */
+    public static Props props(final RemoteOpsProviderConfig config, final ActorRef rpcInvoker,
+                              final ActorRef rpcRegistrar) {
+        return Props.create(RpcRegistry.class, config, rpcInvoker, rpcRegistrar);
     }
 
     @Override
-    protected void handleReceive(Object message) throws Exception {
-        //TODO: if sender is remote, reject message
+    public void preStart() {
+        super.preStart();
+        mxBean = new RemoteRpcRegistryMXBeanImpl(new BucketStoreAccess(self(), getContext().dispatcher(),
+            getConfig().getAskDuration()), getConfig().getAskDuration());
+    }
 
-        if (message instanceof SetLocalRouter) {
-            receiveSetLocalRouter((SetLocalRouter) message);
-        } else if (message instanceof AddOrUpdateRoutes) {
+    @Override
+    public void postStop() throws Exception {
+        if (mxBean != null) {
+            mxBean.unregister();
+            mxBean = null;
+        }
+        super.postStop();
+    }
+
+    @Override
+    protected void handleCommand(final Object message) throws Exception {
+        if (message instanceof AddOrUpdateRoutes) {
             receiveAddRoutes((AddOrUpdateRoutes) message);
         } else if (message instanceof RemoveRoutes) {
             receiveRemoveRoutes((RemoveRoutes) message);
-        } else if (message instanceof Messages.FindRouters) {
-            receiveGetRouter((FindRouters) message);
-        } else if (message instanceof Runnable) {
-            ((Runnable)message).run();
         } else {
-            super.handleReceive(message);
+            super.handleCommand(message);
         }
     }
 
-    /**
-     * Registers a rpc broker.
-     *
-     * @param message contains {@link akka.actor.ActorRef} for rpc broker
-     */
-    private void receiveSetLocalRouter(SetLocalRouter message) {
-        getLocalBucket().getData().setRouter(message.getRouter());
-    }
-
-    private void receiveAddRoutes(AddOrUpdateRoutes msg) {
-
-        log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
-
-        RoutingTable table = getLocalBucket().getData().copy();
-        for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
-            table.addRoute(routeId);
-        }
-
-        updateLocalBucket(table);
-
-        onBucketsUpdated();
+    private void receiveAddRoutes(final AddOrUpdateRoutes msg) {
+        LOG.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
+        updateLocalBucket(getLocalData().addRpcs(msg.getRouteIdentifiers()));
     }
 
     /**
@@ -99,95 +100,50 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
      *
      * @param msg contains list of route ids to remove
      */
-    private void receiveRemoveRoutes(RemoveRoutes msg) {
-
-        RoutingTable table = getLocalBucket().getData().copy();
-        for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
-            table.removeRoute(routeId);
-        }
-
-        updateLocalBucket(table);
+    private void receiveRemoveRoutes(final RemoveRoutes msg) {
+        LOG.debug("RemoveRoutes: {}", msg.getRouteIdentifiers());
+        updateLocalBucket(getLocalData().removeRpcs(msg.getRouteIdentifiers()));
     }
 
-    /**
-     * Finds routers for the given rpc.
-     *
-     * @param findRouters the FindRouters request
-     */
-    private void receiveGetRouter(final FindRouters findRouters) {
-        log.debug("receiveGetRouter for {}", findRouters.getRouteIdentifier());
-
-        final ActorRef sender = getSender();
-        if (!findRouters(findRouters, sender)) {
-            log.debug("No routers found for {} - scheduling {} ms timer", findRouters.getRouteIdentifier(),
-                    findRouterTimeout.toMillis());
-
-            final AtomicReference<Cancellable> timer = new AtomicReference<>();
-            final Runnable routesUpdatedRunnable = new Runnable() {
-                @Override
-                public void run() {
-                    if (findRouters(findRouters, sender)) {
-                        routesUpdatedCallbacks.remove(this);
-                        timer.get().cancel();
-                    }
-                }
-            };
-
-            routesUpdatedCallbacks.add(routesUpdatedRunnable);
-
-            Runnable timerRunnable = () -> {
-                log.warn("Timed out finding routers for {}", findRouters.getRouteIdentifier());
-
-                routesUpdatedCallbacks.remove(routesUpdatedRunnable);
-                sender.tell(new Messages.FindRoutersReply(
-                        Collections.<Pair<ActorRef, Long>>emptyList()), self());
-            };
-
-            timer.set(getContext().system().scheduler().scheduleOnce(findRouterTimeout, self(), timerRunnable,
-                    getContext().dispatcher(), self()));
-        }
+    @Override
+    protected void onBucketRemoved(final Address address, final Bucket<RoutingTable> bucket) {
+        rpcRegistrar.tell(new Messages.UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())),
+                ActorRef.noSender());
     }
 
-    private boolean findRouters(FindRouters findRouters, ActorRef sender) {
-        List<Pair<ActorRef, Long>> routers = new ArrayList<>();
+    @Override
+    protected void onBucketsUpdated(final Map<Address, Bucket<RoutingTable>> buckets) {
+        final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = new HashMap<>(buckets.size());
 
-        RouteIdentifier<?, ?, ?> routeId = findRouters.getRouteIdentifier();
-        findRoutes(getLocalBucket().getData(), routeId, routers);
+        for (Entry<Address, Bucket<RoutingTable>> e : buckets.entrySet()) {
+            final RoutingTable table = e.getValue().getData();
 
-        for (Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
-            findRoutes(bucket.getData(), routeId, routers);
+            final Collection<DOMRpcIdentifier> rpcs = table.getItems();
+            endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty()
+                    : Optional.of(new RemoteRpcEndpoint(table.getInvoker(), rpcs)));
         }
 
-        log.debug("Found {} routers for {}", routers.size(), findRouters.getRouteIdentifier());
-
-        boolean foundRouters = !routers.isEmpty();
-        if (foundRouters) {
-            sender.tell(new Messages.FindRoutersReply(routers), getSelf());
+        if (!endpoints.isEmpty()) {
+            rpcRegistrar.tell(new Messages.UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
         }
-
-        return foundRouters;
     }
 
-    private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier<?, ?, ?> routeId,
-            List<Pair<ActorRef, Long>> routers) {
-        if (table == null) {
-            return;
-        }
+    public static final class RemoteRpcEndpoint {
+        private final Set<DOMRpcIdentifier> rpcs;
+        private final ActorRef router;
 
-        Option<Pair<ActorRef, Long>> routerWithUpdateTime = table.getRouterFor(routeId);
-        if (!routerWithUpdateTime.isEmpty()) {
-            routers.add(routerWithUpdateTime.get());
+        @VisibleForTesting
+        public RemoteRpcEndpoint(final ActorRef router, final Collection<DOMRpcIdentifier> rpcs) {
+            this.router = requireNonNull(router);
+            this.rpcs = ImmutableSet.copyOf(rpcs);
         }
-    }
 
-    @Override
-    protected void onBucketsUpdated() {
-        if (routesUpdatedCallbacks.isEmpty()) {
-            return;
+        public ActorRef getRouter() {
+            return router;
         }
 
-        for (Runnable callBack: routesUpdatedCallbacks.toArray(new Runnable[routesUpdatedCallbacks.size()])) {
-            callBack.run();
+        public Set<DOMRpcIdentifier> getRpcs() {
+            return rpcs;
         }
     }
 
@@ -195,109 +151,50 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
      * All messages used by the RpcRegistry.
      */
     public static class Messages {
+        abstract static class AbstractRouteMessage {
+            final List<DOMRpcIdentifier> rpcRouteIdentifiers;
 
-
-        public static class ContainsRoute {
-            final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
-
-            public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
-                Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(),
+            AbstractRouteMessage(final Collection<DOMRpcIdentifier> rpcRouteIdentifiers) {
+                checkArgument(rpcRouteIdentifiers != null && !rpcRouteIdentifiers.isEmpty(),
                         "Route Identifiers must be supplied");
-                this.routeIdentifiers = routeIdentifiers;
+                this.rpcRouteIdentifiers = ImmutableList.copyOf(rpcRouteIdentifiers);
             }
 
-            public List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
-                return this.routeIdentifiers;
+            List<DOMRpcIdentifier> getRouteIdentifiers() {
+                return this.rpcRouteIdentifiers;
             }
 
             @Override
             public String toString() {
-                return "ContainsRoute{" + "routeIdentifiers=" + routeIdentifiers + '}';
+                return "ContainsRoute{" + "routeIdentifiers=" + rpcRouteIdentifiers + '}';
             }
         }
 
-        public static class AddOrUpdateRoutes extends ContainsRoute {
-
-            public AddOrUpdateRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
-                super(routeIdentifiers);
+        public static final class AddOrUpdateRoutes extends Messages.AbstractRouteMessage {
+            public AddOrUpdateRoutes(final Collection<DOMRpcIdentifier> rpcRouteIdentifiers) {
+                super(rpcRouteIdentifiers);
             }
-        }
-
-        public static class RemoveRoutes extends ContainsRoute {
 
-            public RemoveRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
-                super(routeIdentifiers);
-            }
         }
 
-        public static class SetLocalRouter {
-            private final ActorRef router;
-
-            public SetLocalRouter(ActorRef router) {
-                Preconditions.checkArgument(router != null, "Router must not be null");
-                this.router = router;
-            }
-
-            public ActorRef getRouter() {
-                return this.router;
-            }
-
-            @Override
-            public String toString() {
-                return "SetLocalRouter{" + "router=" + router + '}';
+        public static final class RemoveRoutes extends AbstractRouteMessage {
+            public RemoveRoutes(final Collection<DOMRpcIdentifier> rpcRouteIdentifiers) {
+                super(rpcRouteIdentifiers);
             }
         }
 
-        public static class FindRouters {
-            private final RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier;
+        public static final class UpdateRemoteEndpoints {
+            private final Map<Address, Optional<RemoteRpcEndpoint>> rpcEndpoints;
 
-            public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
-                Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
-                this.routeIdentifier = routeIdentifier;
-            }
 
-            public RpcRouter.RouteIdentifier<?, ?, ?> getRouteIdentifier() {
-                return routeIdentifier;
+            @VisibleForTesting
+            public UpdateRemoteEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> rpcEndpoints) {
+                this.rpcEndpoints = ImmutableMap.copyOf(rpcEndpoints);
             }
 
-            @Override
-            public String toString() {
-                return "FindRouters{" + "routeIdentifier=" + routeIdentifier + '}';
-            }
-        }
-
-        public static class FindRoutersReply {
-            final List<Pair<ActorRef, Long>> routerWithUpdateTime;
-
-            public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
-                Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
-                this.routerWithUpdateTime = routerWithUpdateTime;
-            }
-
-            public List<Pair<ActorRef, Long>> getRouterWithUpdateTime() {
-                return routerWithUpdateTime;
-            }
-
-            @Override
-            public String toString() {
-                return "FindRoutersReply{" + "routerWithUpdateTime=" + routerWithUpdateTime + '}';
+            public Map<Address, Optional<RemoteRpcEndpoint>> getRpcEndpoints() {
+                return rpcEndpoints;
             }
         }
     }
-
-    private static class RpcRegistryCreator implements Creator<RpcRegistry> {
-        private static final long serialVersionUID = 1L;
-        private final RemoteRpcProviderConfig config;
-
-        private RpcRegistryCreator(RemoteRpcProviderConfig config) {
-            this.config = config;
-        }
-
-        @Override
-        public RpcRegistry create() throws Exception {
-            RpcRegistry registry =  new RpcRegistry(config);
-            new RemoteRpcRegistryMXBeanImpl(registry);
-            return registry;
-        }
-    }
 }