X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2FRpcManager.java;h=4cbce63f9aa2a9b9f44f4c1ba3a27a225d4c2d42;hp=5c56455bd0c208a40709b4a221fb6a22ed0b65a1;hb=d58f9f17f0f552105fa2c01960991f484b51e733;hpb=9c3a7d1aa13908ee5f0be33b63dfd2467af551be diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java index 5c56455bd0..4cbce63f9a 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java @@ -13,10 +13,10 @@ import akka.actor.ActorRef; import akka.actor.OneForOneStrategy; import akka.actor.Props; import akka.actor.SupervisorStrategy; -import akka.japi.Creator; import akka.japi.Function; +import java.util.Set; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext; -import org.opendaylight.controller.remote.rpc.registry.ClusterWrapper; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; import org.opendaylight.controller.sal.core.api.Broker; import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry; @@ -25,7 +25,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.Duration; -import java.util.Set; /** * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown. @@ -38,51 +37,54 @@ public class RpcManager extends AbstractUntypedActor { private static final Logger LOG = LoggerFactory.getLogger(RpcManager.class); private SchemaContext schemaContext; - private final ClusterWrapper clusterWrapper; private ActorRef rpcBroker; private ActorRef rpcRegistry; private final Broker.ProviderSession brokerSession; + private final RemoteRpcProviderConfig config; private RpcListener rpcListener; private RoutedRpcListener routeChangeListener; private RemoteRpcImplementation rpcImplementation; private final RpcProvisionRegistry rpcProvisionRegistry; - private RpcManager(ClusterWrapper clusterWrapper, SchemaContext schemaContext, - Broker.ProviderSession brokerSession, RpcProvisionRegistry rpcProvisionRegistry) { - this.clusterWrapper = clusterWrapper; + private RpcManager(SchemaContext schemaContext, + Broker.ProviderSession brokerSession, + RpcProvisionRegistry rpcProvisionRegistry) { this.schemaContext = schemaContext; this.brokerSession = brokerSession; this.rpcProvisionRegistry = rpcProvisionRegistry; + this.config = new RemoteRpcProviderConfig(getContext().system().settings().config()); createRpcActors(); startListeners(); } - public static Props props(final ClusterWrapper clusterWrapper, final SchemaContext schemaContext, - final Broker.ProviderSession brokerSession, final RpcProvisionRegistry rpcProvisionRegistry) { - return Props.create(new Creator() { - @Override - public RpcManager create() throws Exception { - return new RpcManager(clusterWrapper, schemaContext, brokerSession, rpcProvisionRegistry); - } - }); - } + public static Props props(final SchemaContext schemaContext, final Broker.ProviderSession brokerSession, + final RpcProvisionRegistry rpcProvisionRegistry) { + return Props.create(RpcManager.class, schemaContext, brokerSession, rpcProvisionRegistry); + } private void createRpcActors() { LOG.debug("Create rpc registry and broker actors"); - rpcRegistry = getContext().actorOf(RpcRegistry.props(clusterWrapper), ActorConstants.RPC_REGISTRY); - rpcBroker = getContext().actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext), ActorConstants.RPC_BROKER); + rpcRegistry = + getContext().actorOf(Props.create(RpcRegistry.class). + withMailbox(config.getMailBoxName()), config.getRpcRegistryName()); + + rpcBroker = + getContext().actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext). + withMailbox(config.getMailBoxName()), config.getRpcBrokerName()); + + RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker); + rpcRegistry.tell(localRouter, self()); } private void startListeners() { LOG.debug("Registers rpc listeners"); - String rpcBrokerPath = clusterWrapper.getAddress().toString() + ActorConstants.RPC_BROKER_PATH; - rpcListener = new RpcListener(rpcRegistry, rpcBrokerPath); - routeChangeListener = new RoutedRpcListener(rpcRegistry, rpcBrokerPath); - rpcImplementation = new RemoteRpcImplementation(rpcBroker, schemaContext); + rpcListener = new RpcListener(rpcRegistry); + routeChangeListener = new RoutedRpcListener(rpcRegistry); + rpcImplementation = new RemoteRpcImplementation(rpcBroker, schemaContext, config); brokerSession.addRpcRegistrationListener(rpcListener); rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener); @@ -112,6 +114,7 @@ public class RpcManager extends AbstractUntypedActor { private void updateSchemaContext(UpdateSchemaContext message) { this.schemaContext = message.getSchemaContext(); + rpcBroker.tell(message, ActorRef.noSender()); } @Override