X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2FRpcManager.java;h=fc75ea6089d0e045fb431cb1ff8caa55e532ca2a;hb=refs%2Fchanges%2F62%2F27562%2F9;hp=f3cb78a30148e0a0de638120cfb6f973c822bc0f;hpb=608760751ce7fcf4e84e86a8b33d43bc1d9984d6;p=controller.git diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java index f3cb78a301..fc75ea6089 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java @@ -16,14 +16,18 @@ import akka.actor.SupervisorStrategy; import akka.japi.Function; import com.google.common.base.Preconditions; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Set; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier; import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService; import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; +import org.opendaylight.controller.md.sal.dom.broker.spi.rpc.RpcRoutingStrategy; import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.model.api.Module; import org.opendaylight.yangtools.yang.model.api.RpcDefinition; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; @@ -45,18 +49,18 @@ public class RpcManager extends AbstractUntypedActor { private ActorRef rpcRegistry; private final RemoteRpcProviderConfig config; private RpcListener rpcListener; - private RoutedRpcListener routeChangeListener; private RemoteRpcImplementation rpcImplementation; private final DOMRpcProviderService rpcProvisionRegistry; private final DOMRpcService rpcServices; private RpcManager(final SchemaContext schemaContext, final DOMRpcProviderService rpcProvisionRegistry, - final DOMRpcService rpcSevices) { + final DOMRpcService rpcSevices, + final RemoteRpcProviderConfig config) { this.schemaContext = schemaContext; this.rpcProvisionRegistry = rpcProvisionRegistry; rpcServices = rpcSevices; - config = new RemoteRpcProviderConfig(getContext().system().settings().config()); + this.config = config; createRpcActors(); startListeners(); @@ -64,22 +68,23 @@ public class RpcManager extends AbstractUntypedActor { public static Props props(final SchemaContext schemaContext, - final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices) { + final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices, + final RemoteRpcProviderConfig config) { Preconditions.checkNotNull(schemaContext, "SchemaContext can not be null!"); Preconditions.checkNotNull(rpcProvisionRegistry, "RpcProviderService can not be null!"); Preconditions.checkNotNull(rpcServices, "RpcService can not be null!"); - return Props.create(RpcManager.class, schemaContext, rpcProvisionRegistry, rpcServices); + return Props.create(RpcManager.class, schemaContext, rpcProvisionRegistry, rpcServices, config); } private void createRpcActors() { LOG.debug("Create rpc registry and broker actors"); rpcRegistry = - getContext().actorOf(RpcRegistry.props(). + getContext().actorOf(RpcRegistry.props(config). withMailbox(config.getMailBoxName()), config.getRpcRegistryName()); rpcBroker = - getContext().actorOf(RpcBroker.props(rpcServices, rpcRegistry). + getContext().actorOf(RpcBroker.props(rpcServices). withMailbox(config.getMailBoxName()), config.getRpcBrokerName()); final RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker); @@ -90,16 +95,28 @@ public class RpcManager extends AbstractUntypedActor { LOG.debug("Registers rpc listeners"); rpcListener = new RpcListener(rpcRegistry); - routeChangeListener = new RoutedRpcListener(rpcRegistry); - rpcImplementation = new RemoteRpcImplementation(rpcBroker, config); + rpcImplementation = new RemoteRpcImplementation(rpcRegistry, config); rpcServices.registerRpcListener(rpcListener); -// rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener); -// rpcProvisionRegistry.setRoutedRpcDefaultDelegate(rpcImplementation); + registerRoutedRpcDelegate(); announceSupportedRpcs(); } + private void registerRoutedRpcDelegate() { + final Set rpcIdentifiers = new HashSet<>(); + final Set modules = schemaContext.getModules(); + for(final Module module : modules){ + for(final RpcDefinition rpcDefinition : module.getRpcs()){ + if(RpcRoutingStrategy.from(rpcDefinition).isContextBasedRouted()) { + LOG.debug("Adding routed rpcDefinition for path {}", rpcDefinition.getPath()); + rpcIdentifiers.add(DOMRpcIdentifier.create(rpcDefinition.getPath(), YangInstanceIdentifier.EMPTY)); + } + } + } + rpcProvisionRegistry.registerRpcImplementation(rpcImplementation, rpcIdentifiers); + } + /** * Add all the locally registered RPCs in the clustered routing table */ @@ -110,7 +127,9 @@ public class RpcManager extends AbstractUntypedActor { for (final RpcDefinition rpcDef : currentlySupportedRpc) { rpcs.add(DOMRpcIdentifier.create(rpcDef.getPath())); } - rpcListener.onRpcAvailable(rpcs); + if(!rpcs.isEmpty()) { + rpcListener.onRpcAvailable(rpcs); + } } @@ -124,6 +143,7 @@ public class RpcManager extends AbstractUntypedActor { private void updateSchemaContext(final UpdateSchemaContext message) { schemaContext = message.getSchemaContext(); + registerRoutedRpcDelegate(); rpcBroker.tell(message, ActorRef.noSender()); } @@ -133,6 +153,8 @@ public class RpcManager extends AbstractUntypedActor { new Function() { @Override public SupervisorStrategy.Directive apply(final Throwable t) { + LOG.error("An exception happened actor will be resumed", t); + return SupervisorStrategy.resume(); } }