Add support for reusable streaming
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / RpcManager.java
index a8407129999eade20e9a89f2563f255e94c45ed1..5dbc1cdb3950b42df9595e0c25c4ae5bb0cf7cc8 100644 (file)
 
 package org.opendaylight.controller.remote.rpc;
 
-
 import akka.actor.ActorRef;
 import akka.actor.OneForOneStrategy;
 import akka.actor.Props;
 import akka.actor.SupervisorStrategy;
-import akka.japi.Creator;
-import akka.japi.Function;
+import com.google.common.base.Preconditions;
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.Duration;
-
-import java.util.Set;
+import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
+import org.opendaylight.mdsal.dom.api.DOMRpcService;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
- * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown.
- *
- * It also starts the rpc listeners
+ * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown. It also registers
+ * {@link RpcListener} with the local {@link DOMRpcService}.
  */
-
 public class RpcManager extends AbstractUntypedActor {
+    private final DOMRpcProviderService rpcProvisionRegistry;
+    private final RemoteRpcProviderConfig config;
+    private final DOMRpcService rpcServices;
+
+    private ListenerRegistration<RpcListener> listenerReg;
+    private ActorRef rpcInvoker;
+    private ActorRef rpcRegistry;
+    private ActorRef rpcRegistrar;
+
+    RpcManager(final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices,
+            final RemoteRpcProviderConfig config) {
+        this.rpcProvisionRegistry = Preconditions.checkNotNull(rpcProvisionRegistry);
+        this.rpcServices = Preconditions.checkNotNull(rpcServices);
+        this.config = Preconditions.checkNotNull(config);
+    }
 
-  private static final Logger LOG = LoggerFactory.getLogger(RpcManager.class);
-
-  private SchemaContext schemaContext;
-  private ActorRef rpcBroker;
-  private ActorRef rpcRegistry;
-  private final Broker.ProviderSession brokerSession;
-  private final RemoteRpcProviderConfig config;
-  private RpcListener rpcListener;
-  private RoutedRpcListener routeChangeListener;
-  private RemoteRpcImplementation rpcImplementation;
-  private final RpcProvisionRegistry rpcProvisionRegistry;
-
-  private RpcManager(SchemaContext schemaContext,
-                     Broker.ProviderSession brokerSession,
-                     RpcProvisionRegistry rpcProvisionRegistry) {
-    this.schemaContext = schemaContext;
-    this.brokerSession = brokerSession;
-    this.rpcProvisionRegistry = rpcProvisionRegistry;
-    this.config = new RemoteRpcProviderConfig(getContext().system().settings().config());
-
-    createRpcActors();
-    startListeners();
-  }
-
-
-  public static Props props(final SchemaContext schemaContext,
-                            final Broker.ProviderSession brokerSession,
-                            final RpcProvisionRegistry rpcProvisionRegistry) {
-    return Props.create(new Creator<RpcManager>() {
-      private static final long serialVersionUID = 1L;
-      @Override
-      public RpcManager create() throws Exception {
-        return new RpcManager(schemaContext, brokerSession, rpcProvisionRegistry);
-      }
-    });
-  }
-
-  private void createRpcActors() {
-    LOG.debug("Create rpc registry and broker actors");
-
-    rpcRegistry =
-            getContext().actorOf(Props.create(RpcRegistry.class).
-                withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
-
-    rpcBroker =
-            getContext().actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext).
-                withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
+    public static Props props(final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices,
+            final RemoteRpcProviderConfig config) {
+        Preconditions.checkNotNull(rpcProvisionRegistry, "RpcProviderService can not be null!");
+        Preconditions.checkNotNull(rpcServices, "RpcService can not be null!");
+        Preconditions.checkNotNull(config, "RemoteRpcProviderConfig can not be null!");
+        return Props.create(RpcManager.class, rpcProvisionRegistry, rpcServices, config);
+    }
 
-    RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
-    rpcRegistry.tell(localRouter, self());
-  }
+    @Override
+    public void preStart() throws Exception {
+        super.preStart();
 
-  private void startListeners() {
-    LOG.debug("Registers rpc listeners");
+        rpcInvoker = getContext().actorOf(RpcInvoker.props(rpcServices)
+            .withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
+        LOG.debug("Listening for RPC invocation requests with {}", rpcInvoker);
 
-    rpcListener = new RpcListener(rpcRegistry);
-    routeChangeListener = new RoutedRpcListener(rpcRegistry);
-    rpcImplementation = new RemoteRpcImplementation(rpcBroker, schemaContext, config);
+        rpcRegistrar = getContext().actorOf(RpcRegistrar.props(config, rpcProvisionRegistry)
+            .withMailbox(config.getMailBoxName()), config.getRpcRegistrarName());
+        LOG.debug("Registering remote RPCs with {}", rpcRegistrar);
 
-    brokerSession.addRpcRegistrationListener(rpcListener);
-    rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
-    rpcProvisionRegistry.setRoutedRpcDefaultDelegate(rpcImplementation);
-    announceSupportedRpcs();
-  }
+        rpcRegistry = getContext().actorOf(RpcRegistry.props(config, rpcInvoker, rpcRegistrar)
+                .withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
+        LOG.debug("Propagating RPC information with {}", rpcRegistry);
 
-  /**
-   * Add all the locally registered RPCs in the clustered routing table
-   */
-  private void announceSupportedRpcs(){
-    LOG.debug("Adding all supported rpcs to routing table");
-    Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
-    for (QName rpc : currentlySupported) {
-      rpcListener.onRpcImplementationAdded(rpc);
+        final RpcListener rpcListener = new RpcListener(rpcRegistry);
+        LOG.debug("Registering local availabitility listener {}", rpcListener);
+        listenerReg = rpcServices.registerRpcListener(rpcListener);
     }
-  }
 
+    @Override
+    public void postStop() throws Exception {
+        if (listenerReg != null) {
+            listenerReg.close();
+            listenerReg = null;
+        }
 
-  @Override
-  protected void handleReceive(Object message) throws Exception {
-    if(message instanceof UpdateSchemaContext) {
-      updateSchemaContext((UpdateSchemaContext) message);
+        super.postStop();
     }
 
-  }
-
-  private void updateSchemaContext(UpdateSchemaContext message) {
-    this.schemaContext = message.getSchemaContext();
-  }
+    @Override
+    protected void handleReceive(final Object message) {
+        unknownMessage(message);
+    }
 
-  @Override
-  public SupervisorStrategy supervisorStrategy() {
-    return new OneForOneStrategy(10, Duration.create("1 minute"),
-        new Function<Throwable, SupervisorStrategy.Directive>() {
-          @Override
-          public SupervisorStrategy.Directive apply(Throwable t) {
+    @Override
+    public SupervisorStrategy supervisorStrategy() {
+        return new OneForOneStrategy(10, FiniteDuration.create(1, TimeUnit.MINUTES), t -> {
+            LOG.error("An exception happened actor will be resumed", t);
             return SupervisorStrategy.resume();
-          }
-        }
-    );
-  }
+        });
+    }
 }