BUG 4151 : Create a shared actor system
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / RpcManager.java
index f3cb78a30148e0a0de638120cfb6f973c822bc0f..fc75ea6089d0e045fb431cb1ff8caa55e532ca2a 100644 (file)
@@ -16,14 +16,18 @@ import akka.actor.SupervisorStrategy;
 import akka.japi.Function;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.md.sal.dom.broker.spi.rpc.RpcRoutingStrategy;
 import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.Module;
 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
@@ -45,18 +49,18 @@ public class RpcManager extends AbstractUntypedActor {
     private ActorRef rpcRegistry;
     private final RemoteRpcProviderConfig config;
     private RpcListener rpcListener;
-    private RoutedRpcListener routeChangeListener;
     private RemoteRpcImplementation rpcImplementation;
     private final DOMRpcProviderService rpcProvisionRegistry;
     private final DOMRpcService rpcServices;
 
     private RpcManager(final SchemaContext schemaContext,
                        final DOMRpcProviderService rpcProvisionRegistry,
-                       final DOMRpcService rpcSevices) {
+                       final DOMRpcService rpcSevices,
+                       final RemoteRpcProviderConfig config) {
         this.schemaContext = schemaContext;
         this.rpcProvisionRegistry = rpcProvisionRegistry;
         rpcServices = rpcSevices;
-        config = new RemoteRpcProviderConfig(getContext().system().settings().config());
+        this.config = config;
 
         createRpcActors();
         startListeners();
@@ -64,22 +68,23 @@ public class RpcManager extends AbstractUntypedActor {
 
 
       public static Props props(final SchemaContext schemaContext,
-              final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices) {
+              final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices,
+              final RemoteRpcProviderConfig config) {
           Preconditions.checkNotNull(schemaContext, "SchemaContext can not be null!");
           Preconditions.checkNotNull(rpcProvisionRegistry, "RpcProviderService can not be null!");
           Preconditions.checkNotNull(rpcServices, "RpcService can not be null!");
-          return Props.create(RpcManager.class, schemaContext, rpcProvisionRegistry, rpcServices);
+          return Props.create(RpcManager.class, schemaContext, rpcProvisionRegistry, rpcServices, config);
       }
 
     private void createRpcActors() {
         LOG.debug("Create rpc registry and broker actors");
 
         rpcRegistry =
-                getContext().actorOf(RpcRegistry.props().
+                getContext().actorOf(RpcRegistry.props(config).
                     withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
 
         rpcBroker =
-                getContext().actorOf(RpcBroker.props(rpcServices, rpcRegistry).
+                getContext().actorOf(RpcBroker.props(rpcServices).
                     withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
 
         final RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
@@ -90,16 +95,28 @@ public class RpcManager extends AbstractUntypedActor {
         LOG.debug("Registers rpc listeners");
 
         rpcListener = new RpcListener(rpcRegistry);
-        routeChangeListener = new RoutedRpcListener(rpcRegistry);
-        rpcImplementation = new RemoteRpcImplementation(rpcBroker, config);
+        rpcImplementation = new RemoteRpcImplementation(rpcRegistry, config);
 
         rpcServices.registerRpcListener(rpcListener);
 
-//        rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
-//        rpcProvisionRegistry.setRoutedRpcDefaultDelegate(rpcImplementation);
+        registerRoutedRpcDelegate();
         announceSupportedRpcs();
     }
 
+    private void registerRoutedRpcDelegate() {
+        final Set<DOMRpcIdentifier> rpcIdentifiers = new HashSet<>();
+        final Set<Module> modules = schemaContext.getModules();
+        for(final Module module : modules){
+            for(final RpcDefinition rpcDefinition : module.getRpcs()){
+                if(RpcRoutingStrategy.from(rpcDefinition).isContextBasedRouted()) {
+                    LOG.debug("Adding routed rpcDefinition for path {}", rpcDefinition.getPath());
+                    rpcIdentifiers.add(DOMRpcIdentifier.create(rpcDefinition.getPath(), YangInstanceIdentifier.EMPTY));
+                }
+            }
+        }
+        rpcProvisionRegistry.registerRpcImplementation(rpcImplementation, rpcIdentifiers);
+    }
+
     /**
      * Add all the locally registered RPCs in the clustered routing table
      */
@@ -110,7 +127,9 @@ public class RpcManager extends AbstractUntypedActor {
         for (final RpcDefinition rpcDef : currentlySupportedRpc) {
             rpcs.add(DOMRpcIdentifier.create(rpcDef.getPath()));
         }
-        rpcListener.onRpcAvailable(rpcs);
+        if(!rpcs.isEmpty()) {
+            rpcListener.onRpcAvailable(rpcs);
+        }
     }
 
 
@@ -124,6 +143,7 @@ public class RpcManager extends AbstractUntypedActor {
 
     private void updateSchemaContext(final UpdateSchemaContext message) {
       schemaContext = message.getSchemaContext();
+      registerRoutedRpcDelegate();
       rpcBroker.tell(message, ActorRef.noSender());
     }
 
@@ -133,6 +153,8 @@ public class RpcManager extends AbstractUntypedActor {
           new Function<Throwable, SupervisorStrategy.Directive>() {
             @Override
             public SupervisorStrategy.Directive apply(final Throwable t) {
+              LOG.error("An exception happened actor will be resumed", t);
+
               return SupervisorStrategy.resume();
             }
           }