Add support for reusable streaming
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / RpcManager.java
index f3cb78a30148e0a0de638120cfb6f973c822bc0f..5dbc1cdb3950b42df9595e0c25c4ae5bb0cf7cc8 100644 (file)
 
 package org.opendaylight.controller.remote.rpc;
 
-
 import akka.actor.ActorRef;
 import akka.actor.OneForOneStrategy;
 import akka.actor.Props;
 import akka.actor.SupervisorStrategy;
-import akka.japi.Function;
 import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
-import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.Duration;
+import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
+import org.opendaylight.mdsal.dom.api.DOMRpcService;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
- * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown.
- *
- * It also starts the rpc listeners
+ * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown. It also registers
+ * {@link RpcListener} with the local {@link DOMRpcService}.
  */
-
 public class RpcManager extends AbstractUntypedActor {
-
-    private static final Logger LOG = LoggerFactory.getLogger(RpcManager.class);
-
-    private SchemaContext schemaContext;
-    private ActorRef rpcBroker;
-    private ActorRef rpcRegistry;
-    private final RemoteRpcProviderConfig config;
-    private RpcListener rpcListener;
-    private RoutedRpcListener routeChangeListener;
-    private RemoteRpcImplementation rpcImplementation;
     private final DOMRpcProviderService rpcProvisionRegistry;
+    private final RemoteRpcProviderConfig config;
     private final DOMRpcService rpcServices;
 
-    private RpcManager(final SchemaContext schemaContext,
-                       final DOMRpcProviderService rpcProvisionRegistry,
-                       final DOMRpcService rpcSevices) {
-        this.schemaContext = schemaContext;
-        this.rpcProvisionRegistry = rpcProvisionRegistry;
-        rpcServices = rpcSevices;
-        config = new RemoteRpcProviderConfig(getContext().system().settings().config());
+    private ListenerRegistration<RpcListener> listenerReg;
+    private ActorRef rpcInvoker;
+    private ActorRef rpcRegistry;
+    private ActorRef rpcRegistrar;
 
-        createRpcActors();
-        startListeners();
+    RpcManager(final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices,
+            final RemoteRpcProviderConfig config) {
+        this.rpcProvisionRegistry = Preconditions.checkNotNull(rpcProvisionRegistry);
+        this.rpcServices = Preconditions.checkNotNull(rpcServices);
+        this.config = Preconditions.checkNotNull(config);
     }
 
-
-      public static Props props(final SchemaContext schemaContext,
-              final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices) {
-          Preconditions.checkNotNull(schemaContext, "SchemaContext can not be null!");
-          Preconditions.checkNotNull(rpcProvisionRegistry, "RpcProviderService can not be null!");
-          Preconditions.checkNotNull(rpcServices, "RpcService can not be null!");
-          return Props.create(RpcManager.class, schemaContext, rpcProvisionRegistry, rpcServices);
-      }
-
-    private void createRpcActors() {
-        LOG.debug("Create rpc registry and broker actors");
-
-        rpcRegistry =
-                getContext().actorOf(RpcRegistry.props().
-                    withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
-
-        rpcBroker =
-                getContext().actorOf(RpcBroker.props(rpcServices, rpcRegistry).
-                    withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
-
-        final RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
-        rpcRegistry.tell(localRouter, self());
+    public static Props props(final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices,
+            final RemoteRpcProviderConfig config) {
+        Preconditions.checkNotNull(rpcProvisionRegistry, "RpcProviderService can not be null!");
+        Preconditions.checkNotNull(rpcServices, "RpcService can not be null!");
+        Preconditions.checkNotNull(config, "RemoteRpcProviderConfig can not be null!");
+        return Props.create(RpcManager.class, rpcProvisionRegistry, rpcServices, config);
     }
 
-    private void startListeners() {
-        LOG.debug("Registers rpc listeners");
+    @Override
+    public void preStart() throws Exception {
+        super.preStart();
 
-        rpcListener = new RpcListener(rpcRegistry);
-        routeChangeListener = new RoutedRpcListener(rpcRegistry);
-        rpcImplementation = new RemoteRpcImplementation(rpcBroker, config);
+        rpcInvoker = getContext().actorOf(RpcInvoker.props(rpcServices)
+            .withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
+        LOG.debug("Listening for RPC invocation requests with {}", rpcInvoker);
 
-        rpcServices.registerRpcListener(rpcListener);
+        rpcRegistrar = getContext().actorOf(RpcRegistrar.props(config, rpcProvisionRegistry)
+            .withMailbox(config.getMailBoxName()), config.getRpcRegistrarName());
+        LOG.debug("Registering remote RPCs with {}", rpcRegistrar);
 
-//        rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
-//        rpcProvisionRegistry.setRoutedRpcDefaultDelegate(rpcImplementation);
-        announceSupportedRpcs();
-    }
+        rpcRegistry = getContext().actorOf(RpcRegistry.props(config, rpcInvoker, rpcRegistrar)
+                .withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
+        LOG.debug("Propagating RPC information with {}", rpcRegistry);
 
-    /**
-     * Add all the locally registered RPCs in the clustered routing table
-     */
-    private void announceSupportedRpcs(){
-        LOG.debug("Adding all supported rpcs to routing table");
-        final Set<RpcDefinition> currentlySupportedRpc = schemaContext.getOperations();
-        final List<DOMRpcIdentifier> rpcs = new ArrayList<>();
-        for (final RpcDefinition rpcDef : currentlySupportedRpc) {
-            rpcs.add(DOMRpcIdentifier.create(rpcDef.getPath()));
-        }
-        rpcListener.onRpcAvailable(rpcs);
+        final RpcListener rpcListener = new RpcListener(rpcRegistry);
+        LOG.debug("Registering local availabitility listener {}", rpcListener);
+        listenerReg = rpcServices.registerRpcListener(rpcListener);
     }
 
-
     @Override
-    protected void handleReceive(final Object message) throws Exception {
-      if(message instanceof UpdateSchemaContext) {
-        updateSchemaContext((UpdateSchemaContext) message);
-      }
+    public void postStop() throws Exception {
+        if (listenerReg != null) {
+            listenerReg.close();
+            listenerReg = null;
+        }
 
+        super.postStop();
     }
 
-    private void updateSchemaContext(final UpdateSchemaContext message) {
-      schemaContext = message.getSchemaContext();
-      rpcBroker.tell(message, ActorRef.noSender());
+    @Override
+    protected void handleReceive(final Object message) {
+        unknownMessage(message);
     }
 
     @Override
     public SupervisorStrategy supervisorStrategy() {
-      return new OneForOneStrategy(10, Duration.create("1 minute"),
-          new Function<Throwable, SupervisorStrategy.Directive>() {
-            @Override
-            public SupervisorStrategy.Directive apply(final Throwable t) {
-              return SupervisorStrategy.resume();
-            }
-          }
-      );
+        return new OneForOneStrategy(10, FiniteDuration.create(1, TimeUnit.MINUTES), t -> {
+            LOG.error("An exception happened actor will be resumed", t);
+            return SupervisorStrategy.resume();
+        });
     }
 }