BUG-3128: rework sal-remoterpc-connector
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / RpcManager.java
index 8e53bcba83464fd9b336f935ba03fc095be113a6..6f0e3b8f796380e9a0fa61704830562dccc4f303 100644 (file)
 
 package org.opendaylight.controller.remote.rpc;
 
-
 import akka.actor.ActorRef;
 import akka.actor.OneForOneStrategy;
 import akka.actor.Props;
 import akka.actor.SupervisorStrategy;
-import akka.actor.SupervisorStrategy.Directive;
-import akka.japi.Function;
 import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
-import org.opendaylight.controller.md.sal.dom.broker.spi.rpc.RpcRoutingStrategy;
-import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.model.api.Module;
-import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import scala.concurrent.duration.Duration;
 
 /**
- * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown. It also starts
- * the rpc listeners
+ * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown. It also registers
+ * {@link RpcListener} with the local {@link DOMRpcService}.
  */
-
 public class RpcManager extends AbstractUntypedActor {
-    private SchemaContext schemaContext;
-    private ActorRef rpcBroker;
-    private ActorRef rpcRegistry;
-    private final RemoteRpcProviderConfig config;
-    private RpcListener rpcListener;
-    private RemoteRpcImplementation rpcImplementation;
     private final DOMRpcProviderService rpcProvisionRegistry;
+    private final RemoteRpcProviderConfig config;
     private final DOMRpcService rpcServices;
 
-    private RpcManager(final SchemaContext schemaContext,
-                       final DOMRpcProviderService rpcProvisionRegistry,
-                       final DOMRpcService rpcSevices,
-                       final RemoteRpcProviderConfig config) {
-        this.schemaContext = schemaContext;
-        this.rpcProvisionRegistry = rpcProvisionRegistry;
-        rpcServices = rpcSevices;
-        this.config = config;
+    private ListenerRegistration<RpcListener> listenerReg;
+    private ActorRef rpcInvoker;
+    private ActorRef rpcRegistry;
+    private ActorRef rpcRegistrar;
 
-        createRpcActors();
-        startListeners();
+    private RpcManager(final DOMRpcProviderService rpcProvisionRegistry,
+                       final DOMRpcService rpcServices,
+                       final RemoteRpcProviderConfig config) {
+        this.rpcProvisionRegistry = Preconditions.checkNotNull(rpcProvisionRegistry);
+        this.rpcServices = Preconditions.checkNotNull(rpcServices);
+        this.config = Preconditions.checkNotNull(config);
     }
 
-
-    public static Props props(final SchemaContext schemaContext, final DOMRpcProviderService rpcProvisionRegistry,
-            final DOMRpcService rpcServices, final RemoteRpcProviderConfig config) {
-        Preconditions.checkNotNull(schemaContext, "SchemaContext can not be null!");
+    public static Props props(final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices,
+            final RemoteRpcProviderConfig config) {
         Preconditions.checkNotNull(rpcProvisionRegistry, "RpcProviderService can not be null!");
         Preconditions.checkNotNull(rpcServices, "RpcService can not be null!");
-        return Props.create(RpcManager.class, schemaContext, rpcProvisionRegistry, rpcServices, config);
-    }
-
-    private void createRpcActors() {
-        LOG.debug("Create rpc registry and broker actors");
-
-        rpcRegistry = getContext().actorOf(RpcRegistry.props(config)
-                .withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
-
-        rpcBroker = getContext().actorOf(RpcBroker.props(rpcServices)
-                .withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
-
-        final RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
-        rpcRegistry.tell(localRouter, self());
+        Preconditions.checkNotNull(config, "RemoteRpcProviderConfig can not be null!");
+        return Props.create(RpcManager.class, rpcProvisionRegistry, rpcServices, config);
     }
 
-    private void startListeners() {
-        LOG.debug("Registers rpc listeners");
+    @Override
+    public void preStart() throws Exception {
+        super.preStart();
 
-        rpcListener = new RpcListener(rpcRegistry);
-        rpcImplementation = new RemoteRpcImplementation(rpcRegistry, config);
+        rpcInvoker = getContext().actorOf(RpcInvoker.props(rpcServices)
+            .withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
+        LOG.debug("Listening for RPC invocation requests with {}", rpcInvoker);
 
-        rpcServices.registerRpcListener(rpcListener);
+        rpcRegistrar = getContext().actorOf(RpcRegistrar.props(config, rpcProvisionRegistry)
+            .withMailbox(config.getMailBoxName()), config.getRpcRegistrarName());
+        LOG.debug("Registering remote RPCs with {}", rpcRegistrar);
 
-        registerRoutedRpcDelegate();
-        announceSupportedRpcs();
-    }
+        rpcRegistry = getContext().actorOf(RpcRegistry.props(config, rpcInvoker, rpcRegistrar)
+                .withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
+        LOG.debug("Propagating RPC information with {}", rpcRegistry);
 
-    private void registerRoutedRpcDelegate() {
-        final Set<DOMRpcIdentifier> rpcIdentifiers = new HashSet<>();
-        final Set<Module> modules = schemaContext.getModules();
-        for (final Module module : modules) {
-            for (final RpcDefinition rpcDefinition : module.getRpcs()) {
-                if (RpcRoutingStrategy.from(rpcDefinition).isContextBasedRouted()) {
-                    LOG.debug("Adding routed rpcDefinition for path {}", rpcDefinition.getPath());
-                    rpcIdentifiers.add(DOMRpcIdentifier.create(rpcDefinition.getPath(), YangInstanceIdentifier.EMPTY));
-                }
-            }
-        }
-        rpcProvisionRegistry.registerRpcImplementation(rpcImplementation, rpcIdentifiers);
+        final RpcListener rpcListener = new RpcListener(rpcRegistry);
+        LOG.debug("Registering local availabitility listener {}", rpcListener);
+        listenerReg = rpcServices.registerRpcListener(rpcListener);
     }
 
-    /**
-     * Add all the locally registered RPCs in the clustered routing table.
-     */
-    private void announceSupportedRpcs() {
-        LOG.debug("Adding all supported rpcs to routing table");
-        final Set<RpcDefinition> currentlySupportedRpc = schemaContext.getOperations();
-        final List<DOMRpcIdentifier> rpcs = new ArrayList<>();
-        for (final RpcDefinition rpcDef : currentlySupportedRpc) {
-            rpcs.add(DOMRpcIdentifier.create(rpcDef.getPath()));
+    @Override
+    public void postStop() throws Exception {
+        if (listenerReg != null) {
+            listenerReg.close();
+            listenerReg = null;
         }
 
-        if (!rpcs.isEmpty()) {
-            rpcListener.onRpcAvailable(rpcs);
-        }
+        super.postStop();
     }
 
-
     @Override
-    protected void handleReceive(final Object message) throws Exception {
-        if (message instanceof UpdateSchemaContext) {
-            updateSchemaContext((UpdateSchemaContext) message);
-        }
-    }
-
-    private void updateSchemaContext(final UpdateSchemaContext message) {
-        schemaContext = message.getSchemaContext();
-        registerRoutedRpcDelegate();
-        rpcBroker.tell(message, ActorRef.noSender());
+    protected void handleReceive(final Object message) {
+        unknownMessage(message);
     }
 
     @Override
     public SupervisorStrategy supervisorStrategy() {
-        return new OneForOneStrategy(10, Duration.create("1 minute"), (Function<Throwable, Directive>) t -> {
+        return new OneForOneStrategy(10, Duration.create("1 minute"), t -> {
             LOG.error("An exception happened actor will be resumed", t);
-
             return SupervisorStrategy.resume();
         });
     }