Convert sal-remoterpc to mdsal APIs
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
index 1545eb00d2cc9af4bda80881d197bad132f0e667..5ba97a306f7b68625253eaff0f258660139a4a27 100644 (file)
@@ -10,10 +10,11 @@ package org.opendaylight.controller.remote.rpc.registry;
 import akka.actor.ActorRef;
 import akka.actor.Address;
 import akka.actor.Props;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -21,31 +22,32 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
-import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
-import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
-import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor;
+import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl;
+import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
 
 /**
  * Registry to look up cluster nodes that have registered for a given RPC.
  *
  * <p>
- * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
+ * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor} to maintain this
  * cluster wide information.
  */
-public class RpcRegistry extends BucketStore<RoutingTable> {
+public class RpcRegistry extends BucketStoreActor<RoutingTable> {
     private final ActorRef rpcRegistrar;
+    private final RemoteRpcRegistryMXBeanImpl mxBean;
 
     public RpcRegistry(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
-        super(config, new RoutingTable(rpcInvoker));
+        super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker, ImmutableSet.of()));
         this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar);
+        this.mxBean = new RemoteRpcRegistryMXBeanImpl(new BucketStoreAccess(self(), getContext().dispatcher(),
+                config.getAskDuration()), config.getAskDuration());
     }
 
     /**
@@ -62,25 +64,25 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
     }
 
     @Override
-    protected void handleReceive(final Object message) throws Exception {
+    public void postStop() {
+        super.postStop();
+        this.mxBean.unregister();
+    }
+
+    @Override
+    protected void handleCommand(final Object message) throws Exception {
         if (message instanceof AddOrUpdateRoutes) {
             receiveAddRoutes((AddOrUpdateRoutes) message);
         } else if (message instanceof RemoveRoutes) {
             receiveRemoveRoutes((RemoveRoutes) message);
         } else {
-            super.handleReceive(message);
+            super.handleCommand(message);
         }
     }
 
     private void receiveAddRoutes(final AddOrUpdateRoutes msg) {
         LOG.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
-
-        RoutingTable table = getLocalBucket().getData().copy();
-        for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
-            table.addRoute(routeId);
-        }
-
-        updateLocalBucket(table);
+        updateLocalBucket(getLocalData().addRpcs(msg.getRouteIdentifiers()));
     }
 
     /**
@@ -89,12 +91,8 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
      * @param msg contains list of route ids to remove
      */
     private void receiveRemoveRoutes(final RemoveRoutes msg) {
-        RoutingTable table = getLocalBucket().getData().copy();
-        for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
-            table.removeRoute(routeId);
-        }
-
-        updateLocalBucket(table);
+        LOG.debug("RemoveRoutes: {}", msg.getRouteIdentifiers());
+        updateLocalBucket(getLocalData().removeRpcs(msg.getRouteIdentifiers()));
     }
 
     @Override
@@ -109,18 +107,9 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
         for (Entry<Address, Bucket<RoutingTable>> e : buckets.entrySet()) {
             final RoutingTable table = e.getValue().getData();
 
-            final List<DOMRpcIdentifier> rpcs = new ArrayList<>(table.getRoutes().size());
-            for (RouteIdentifier<?, ?, ?> ri : table.getRoutes()) {
-                if (ri instanceof RouteIdentifierImpl) {
-                    final RouteIdentifierImpl id = (RouteIdentifierImpl) ri;
-                    rpcs.add(DOMRpcIdentifier.create(SchemaPath.create(true, id.getType()), id.getRoute()));
-                } else {
-                    LOG.warn("Skipping unsupported route {} from {}", ri, e.getKey());
-                }
-            }
-
+            final Collection<DOMRpcIdentifier> rpcs = table.getRoutes();
             endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty()
-                    : Optional.of(new RemoteRpcEndpoint(table.getRouter(), rpcs)));
+                    : Optional.of(new RemoteRpcEndpoint(table.getRpcInvoker(), rpcs)));
         }
 
         if (!endpoints.isEmpty()) {
@@ -132,7 +121,8 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
         private final Set<DOMRpcIdentifier> rpcs;
         private final ActorRef router;
 
-        RemoteRpcEndpoint(final ActorRef router, final Collection<DOMRpcIdentifier> rpcs) {
+        @VisibleForTesting
+        public RemoteRpcEndpoint(final ActorRef router, final Collection<DOMRpcIdentifier> rpcs) {
             this.router = Preconditions.checkNotNull(router);
             this.rpcs = ImmutableSet.copyOf(rpcs);
         }
@@ -151,15 +141,15 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
      */
     public static class Messages {
         abstract static class AbstractRouteMessage {
-            final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
+            final List<DOMRpcIdentifier> routeIdentifiers;
 
-            AbstractRouteMessage(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+            AbstractRouteMessage(final Collection<DOMRpcIdentifier> routeIdentifiers) {
                 Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(),
                         "Route Identifiers must be supplied");
-                this.routeIdentifiers = routeIdentifiers;
+                this.routeIdentifiers = ImmutableList.copyOf(routeIdentifiers);
             }
 
-            List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
+            List<DOMRpcIdentifier> getRouteIdentifiers() {
                 return this.routeIdentifiers;
             }
 
@@ -170,13 +160,13 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
         }
 
         public static final class AddOrUpdateRoutes extends AbstractRouteMessage {
-            public AddOrUpdateRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+            public AddOrUpdateRoutes(final Collection<DOMRpcIdentifier> routeIdentifiers) {
                 super(routeIdentifiers);
             }
         }
 
         public static final class RemoveRoutes extends AbstractRouteMessage {
-            public RemoveRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+            public RemoveRoutes(final Collection<DOMRpcIdentifier> routeIdentifiers) {
                 super(routeIdentifiers);
             }
         }
@@ -184,7 +174,8 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
         public static final class UpdateRemoteEndpoints {
             private final Map<Address, Optional<RemoteRpcEndpoint>> endpoints;
 
-            UpdateRemoteEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> endpoints) {
+            @VisibleForTesting
+            public UpdateRemoteEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> endpoints) {
                 this.endpoints = ImmutableMap.copyOf(endpoints);
             }