Removed uncessary calls to RpcBroker to find routes.
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / RpcManager.java
index a8407129999eade20e9a89f2563f255e94c45ed1..1ade84bd0fc1bee9fb8ad8db9ac33939cf591422 100644 (file)
@@ -13,21 +13,27 @@ import akka.actor.ActorRef;
 import akka.actor.OneForOneStrategy;
 import akka.actor.Props;
 import akka.actor.SupervisorStrategy;
-import akka.japi.Creator;
 import akka.japi.Function;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.md.sal.dom.broker.spi.rpc.RpcRoutingStrategy;
 import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
-import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.Module;
+import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.Duration;
 
-import java.util.Set;
-
 /**
  * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown.
  *
@@ -36,104 +42,120 @@ import java.util.Set;
 
 public class RpcManager extends AbstractUntypedActor {
 
-  private static final Logger LOG = LoggerFactory.getLogger(RpcManager.class);
-
-  private SchemaContext schemaContext;
-  private ActorRef rpcBroker;
-  private ActorRef rpcRegistry;
-  private final Broker.ProviderSession brokerSession;
-  private final RemoteRpcProviderConfig config;
-  private RpcListener rpcListener;
-  private RoutedRpcListener routeChangeListener;
-  private RemoteRpcImplementation rpcImplementation;
-  private final RpcProvisionRegistry rpcProvisionRegistry;
-
-  private RpcManager(SchemaContext schemaContext,
-                     Broker.ProviderSession brokerSession,
-                     RpcProvisionRegistry rpcProvisionRegistry) {
-    this.schemaContext = schemaContext;
-    this.brokerSession = brokerSession;
-    this.rpcProvisionRegistry = rpcProvisionRegistry;
-    this.config = new RemoteRpcProviderConfig(getContext().system().settings().config());
-
-    createRpcActors();
-    startListeners();
-  }
-
-
-  public static Props props(final SchemaContext schemaContext,
-                            final Broker.ProviderSession brokerSession,
-                            final RpcProvisionRegistry rpcProvisionRegistry) {
-    return Props.create(new Creator<RpcManager>() {
-      private static final long serialVersionUID = 1L;
-      @Override
-      public RpcManager create() throws Exception {
-        return new RpcManager(schemaContext, brokerSession, rpcProvisionRegistry);
+    private static final Logger LOG = LoggerFactory.getLogger(RpcManager.class);
+
+    private SchemaContext schemaContext;
+    private ActorRef rpcBroker;
+    private ActorRef rpcRegistry;
+    private final RemoteRpcProviderConfig config;
+    private RpcListener rpcListener;
+    private RemoteRpcImplementation rpcImplementation;
+    private final DOMRpcProviderService rpcProvisionRegistry;
+    private final DOMRpcService rpcServices;
+
+    private RpcManager(final SchemaContext schemaContext,
+                       final DOMRpcProviderService rpcProvisionRegistry,
+                       final DOMRpcService rpcSevices) {
+        this.schemaContext = schemaContext;
+        this.rpcProvisionRegistry = rpcProvisionRegistry;
+        rpcServices = rpcSevices;
+        config = new RemoteRpcProviderConfig(getContext().system().settings().config());
+
+        createRpcActors();
+        startListeners();
+    }
+
+
+      public static Props props(final SchemaContext schemaContext,
+              final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices) {
+          Preconditions.checkNotNull(schemaContext, "SchemaContext can not be null!");
+          Preconditions.checkNotNull(rpcProvisionRegistry, "RpcProviderService can not be null!");
+          Preconditions.checkNotNull(rpcServices, "RpcService can not be null!");
+          return Props.create(RpcManager.class, schemaContext, rpcProvisionRegistry, rpcServices);
       }
-    });
-  }
-
-  private void createRpcActors() {
-    LOG.debug("Create rpc registry and broker actors");
-
-    rpcRegistry =
-            getContext().actorOf(Props.create(RpcRegistry.class).
-                withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
-
-    rpcBroker =
-            getContext().actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext).
-                withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
-
-    RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
-    rpcRegistry.tell(localRouter, self());
-  }
-
-  private void startListeners() {
-    LOG.debug("Registers rpc listeners");
-
-    rpcListener = new RpcListener(rpcRegistry);
-    routeChangeListener = new RoutedRpcListener(rpcRegistry);
-    rpcImplementation = new RemoteRpcImplementation(rpcBroker, schemaContext, config);
-
-    brokerSession.addRpcRegistrationListener(rpcListener);
-    rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
-    rpcProvisionRegistry.setRoutedRpcDefaultDelegate(rpcImplementation);
-    announceSupportedRpcs();
-  }
-
-  /**
-   * Add all the locally registered RPCs in the clustered routing table
-   */
-  private void announceSupportedRpcs(){
-    LOG.debug("Adding all supported rpcs to routing table");
-    Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
-    for (QName rpc : currentlySupported) {
-      rpcListener.onRpcImplementationAdded(rpc);
+
+    private void createRpcActors() {
+        LOG.debug("Create rpc registry and broker actors");
+
+        rpcRegistry =
+                getContext().actorOf(RpcRegistry.props().
+                    withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
+
+        rpcBroker =
+                getContext().actorOf(RpcBroker.props(rpcServices).
+                    withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
+
+        final RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
+        rpcRegistry.tell(localRouter, self());
     }
-  }
 
+    private void startListeners() {
+        LOG.debug("Registers rpc listeners");
+
+        rpcListener = new RpcListener(rpcRegistry);
+        rpcImplementation = new RemoteRpcImplementation(rpcRegistry, config);
 
-  @Override
-  protected void handleReceive(Object message) throws Exception {
-    if(message instanceof UpdateSchemaContext) {
-      updateSchemaContext((UpdateSchemaContext) message);
+        rpcServices.registerRpcListener(rpcListener);
+
+        registerRoutedRpcDelegate();
+        announceSupportedRpcs();
     }
 
-  }
+    private void registerRoutedRpcDelegate() {
+        final Set<DOMRpcIdentifier> rpcIdentifiers = new HashSet<>();
+        final Set<Module> modules = schemaContext.getModules();
+        for(final Module module : modules){
+            for(final RpcDefinition rpcDefinition : module.getRpcs()){
+                if(RpcRoutingStrategy.from(rpcDefinition).isContextBasedRouted()) {
+                    LOG.debug("Adding routed rpcDefinition for path {}", rpcDefinition.getPath());
+                    rpcIdentifiers.add(DOMRpcIdentifier.create(rpcDefinition.getPath(), YangInstanceIdentifier.EMPTY));
+                }
+            }
+        }
+        rpcProvisionRegistry.registerRpcImplementation(rpcImplementation, rpcIdentifiers);
+    }
+
+    /**
+     * Add all the locally registered RPCs in the clustered routing table
+     */
+    private void announceSupportedRpcs(){
+        LOG.debug("Adding all supported rpcs to routing table");
+        final Set<RpcDefinition> currentlySupportedRpc = schemaContext.getOperations();
+        final List<DOMRpcIdentifier> rpcs = new ArrayList<>();
+        for (final RpcDefinition rpcDef : currentlySupportedRpc) {
+            rpcs.add(DOMRpcIdentifier.create(rpcDef.getPath()));
+        }
+        if(!rpcs.isEmpty()) {
+            rpcListener.onRpcAvailable(rpcs);
+        }
+    }
 
-  private void updateSchemaContext(UpdateSchemaContext message) {
-    this.schemaContext = message.getSchemaContext();
-  }
 
-  @Override
-  public SupervisorStrategy supervisorStrategy() {
-    return new OneForOneStrategy(10, Duration.create("1 minute"),
-        new Function<Throwable, SupervisorStrategy.Directive>() {
-          @Override
-          public SupervisorStrategy.Directive apply(Throwable t) {
-            return SupervisorStrategy.resume();
+    @Override
+    protected void handleReceive(final Object message) throws Exception {
+      if(message instanceof UpdateSchemaContext) {
+        updateSchemaContext((UpdateSchemaContext) message);
+      }
+
+    }
+
+    private void updateSchemaContext(final UpdateSchemaContext message) {
+      schemaContext = message.getSchemaContext();
+      registerRoutedRpcDelegate();
+      rpcBroker.tell(message, ActorRef.noSender());
+    }
+
+    @Override
+    public SupervisorStrategy supervisorStrategy() {
+      return new OneForOneStrategy(10, Duration.create("1 minute"),
+          new Function<Throwable, SupervisorStrategy.Directive>() {
+            @Override
+            public SupervisorStrategy.Directive apply(final Throwable t) {
+              LOG.error("An exception happened actor will be resumed", t);
+
+              return SupervisorStrategy.resume();
+            }
           }
-        }
-    );
-  }
+      );
+    }
 }