X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2FRpcManager.java;h=8e53bcba83464fd9b336f935ba03fc095be113a6;hb=9ddc65e1ddae50f691566cd9382707679436c055;hp=f3cb78a30148e0a0de638120cfb6f973c822bc0f;hpb=e2445278ab489d998aab6d91d46e8dc3febd1c32;p=controller.git diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java index f3cb78a301..8e53bcba83 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java @@ -13,74 +13,71 @@ import akka.actor.ActorRef; import akka.actor.OneForOneStrategy; import akka.actor.Props; import akka.actor.SupervisorStrategy; +import akka.actor.SupervisorStrategy.Directive; import akka.japi.Function; import com.google.common.base.Preconditions; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Set; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier; import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService; import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; +import org.opendaylight.controller.md.sal.dom.broker.spi.rpc.RpcRoutingStrategy; import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.model.api.Module; import org.opendaylight.yangtools.yang.model.api.RpcDefinition; import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import scala.concurrent.duration.Duration; /** - * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown. - * - * It also starts the rpc listeners + * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown. It also starts + * the rpc listeners */ public class RpcManager extends AbstractUntypedActor { - - private static final Logger LOG = LoggerFactory.getLogger(RpcManager.class); - private SchemaContext schemaContext; private ActorRef rpcBroker; private ActorRef rpcRegistry; private final RemoteRpcProviderConfig config; private RpcListener rpcListener; - private RoutedRpcListener routeChangeListener; private RemoteRpcImplementation rpcImplementation; private final DOMRpcProviderService rpcProvisionRegistry; private final DOMRpcService rpcServices; private RpcManager(final SchemaContext schemaContext, final DOMRpcProviderService rpcProvisionRegistry, - final DOMRpcService rpcSevices) { + final DOMRpcService rpcSevices, + final RemoteRpcProviderConfig config) { this.schemaContext = schemaContext; this.rpcProvisionRegistry = rpcProvisionRegistry; rpcServices = rpcSevices; - config = new RemoteRpcProviderConfig(getContext().system().settings().config()); + this.config = config; createRpcActors(); startListeners(); } - public static Props props(final SchemaContext schemaContext, - final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices) { - Preconditions.checkNotNull(schemaContext, "SchemaContext can not be null!"); - Preconditions.checkNotNull(rpcProvisionRegistry, "RpcProviderService can not be null!"); - Preconditions.checkNotNull(rpcServices, "RpcService can not be null!"); - return Props.create(RpcManager.class, schemaContext, rpcProvisionRegistry, rpcServices); - } + public static Props props(final SchemaContext schemaContext, final DOMRpcProviderService rpcProvisionRegistry, + final DOMRpcService rpcServices, final RemoteRpcProviderConfig config) { + Preconditions.checkNotNull(schemaContext, "SchemaContext can not be null!"); + Preconditions.checkNotNull(rpcProvisionRegistry, "RpcProviderService can not be null!"); + Preconditions.checkNotNull(rpcServices, "RpcService can not be null!"); + return Props.create(RpcManager.class, schemaContext, rpcProvisionRegistry, rpcServices, config); + } private void createRpcActors() { LOG.debug("Create rpc registry and broker actors"); - rpcRegistry = - getContext().actorOf(RpcRegistry.props(). - withMailbox(config.getMailBoxName()), config.getRpcRegistryName()); + rpcRegistry = getContext().actorOf(RpcRegistry.props(config) + .withMailbox(config.getMailBoxName()), config.getRpcRegistryName()); - rpcBroker = - getContext().actorOf(RpcBroker.props(rpcServices, rpcRegistry). - withMailbox(config.getMailBoxName()), config.getRpcBrokerName()); + rpcBroker = getContext().actorOf(RpcBroker.props(rpcServices) + .withMailbox(config.getMailBoxName()), config.getRpcBrokerName()); final RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker); rpcRegistry.tell(localRouter, self()); @@ -90,52 +87,64 @@ public class RpcManager extends AbstractUntypedActor { LOG.debug("Registers rpc listeners"); rpcListener = new RpcListener(rpcRegistry); - routeChangeListener = new RoutedRpcListener(rpcRegistry); - rpcImplementation = new RemoteRpcImplementation(rpcBroker, config); + rpcImplementation = new RemoteRpcImplementation(rpcRegistry, config); rpcServices.registerRpcListener(rpcListener); -// rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener); -// rpcProvisionRegistry.setRoutedRpcDefaultDelegate(rpcImplementation); + registerRoutedRpcDelegate(); announceSupportedRpcs(); } + private void registerRoutedRpcDelegate() { + final Set rpcIdentifiers = new HashSet<>(); + final Set modules = schemaContext.getModules(); + for (final Module module : modules) { + for (final RpcDefinition rpcDefinition : module.getRpcs()) { + if (RpcRoutingStrategy.from(rpcDefinition).isContextBasedRouted()) { + LOG.debug("Adding routed rpcDefinition for path {}", rpcDefinition.getPath()); + rpcIdentifiers.add(DOMRpcIdentifier.create(rpcDefinition.getPath(), YangInstanceIdentifier.EMPTY)); + } + } + } + rpcProvisionRegistry.registerRpcImplementation(rpcImplementation, rpcIdentifiers); + } + /** - * Add all the locally registered RPCs in the clustered routing table + * Add all the locally registered RPCs in the clustered routing table. */ - private void announceSupportedRpcs(){ + private void announceSupportedRpcs() { LOG.debug("Adding all supported rpcs to routing table"); final Set currentlySupportedRpc = schemaContext.getOperations(); final List rpcs = new ArrayList<>(); for (final RpcDefinition rpcDef : currentlySupportedRpc) { rpcs.add(DOMRpcIdentifier.create(rpcDef.getPath())); } - rpcListener.onRpcAvailable(rpcs); + + if (!rpcs.isEmpty()) { + rpcListener.onRpcAvailable(rpcs); + } } @Override protected void handleReceive(final Object message) throws Exception { - if(message instanceof UpdateSchemaContext) { - updateSchemaContext((UpdateSchemaContext) message); - } - + if (message instanceof UpdateSchemaContext) { + updateSchemaContext((UpdateSchemaContext) message); + } } private void updateSchemaContext(final UpdateSchemaContext message) { - schemaContext = message.getSchemaContext(); - rpcBroker.tell(message, ActorRef.noSender()); + schemaContext = message.getSchemaContext(); + registerRoutedRpcDelegate(); + rpcBroker.tell(message, ActorRef.noSender()); } @Override public SupervisorStrategy supervisorStrategy() { - return new OneForOneStrategy(10, Duration.create("1 minute"), - new Function() { - @Override - public SupervisorStrategy.Directive apply(final Throwable t) { - return SupervisorStrategy.resume(); - } - } - ); + return new OneForOneStrategy(10, Duration.create("1 minute"), (Function) t -> { + LOG.error("An exception happened actor will be resumed", t); + + return SupervisorStrategy.resume(); + }); } }