import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
-import akka.japi.Creator;
import akka.japi.Function;
+import java.util.Set;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
-import org.opendaylight.controller.remote.rpc.registry.ClusterWrapper;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
import org.opendaylight.controller.sal.core.api.Broker;
import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;
-import java.util.Set;
/**
* This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown.
private static final Logger LOG = LoggerFactory.getLogger(RpcManager.class);
private SchemaContext schemaContext;
- private final ClusterWrapper clusterWrapper;
private ActorRef rpcBroker;
private ActorRef rpcRegistry;
private final Broker.ProviderSession brokerSession;
+ private final RemoteRpcProviderConfig config;
private RpcListener rpcListener;
private RoutedRpcListener routeChangeListener;
private RemoteRpcImplementation rpcImplementation;
private final RpcProvisionRegistry rpcProvisionRegistry;
- private RpcManager(ClusterWrapper clusterWrapper, SchemaContext schemaContext,
- Broker.ProviderSession brokerSession, RpcProvisionRegistry rpcProvisionRegistry) {
- this.clusterWrapper = clusterWrapper;
+ private RpcManager(SchemaContext schemaContext,
+ Broker.ProviderSession brokerSession,
+ RpcProvisionRegistry rpcProvisionRegistry) {
this.schemaContext = schemaContext;
this.brokerSession = brokerSession;
this.rpcProvisionRegistry = rpcProvisionRegistry;
+ this.config = new RemoteRpcProviderConfig(getContext().system().settings().config());
createRpcActors();
startListeners();
}
- public static Props props(final ClusterWrapper clusterWrapper, final SchemaContext schemaContext,
- final Broker.ProviderSession brokerSession, final RpcProvisionRegistry rpcProvisionRegistry) {
- return Props.create(new Creator<RpcManager>() {
- @Override
- public RpcManager create() throws Exception {
- return new RpcManager(clusterWrapper, schemaContext, brokerSession, rpcProvisionRegistry);
- }
- });
- }
+ public static Props props(final SchemaContext schemaContext, final Broker.ProviderSession brokerSession,
+ final RpcProvisionRegistry rpcProvisionRegistry) {
+ return Props.create(RpcManager.class, schemaContext, brokerSession, rpcProvisionRegistry);
+ }
private void createRpcActors() {
LOG.debug("Create rpc registry and broker actors");
- rpcRegistry = getContext().actorOf(RpcRegistry.props(clusterWrapper), "rpc-registry");
- rpcBroker = getContext().actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext), "rpc-broker");
+ rpcRegistry =
+ getContext().actorOf(Props.create(RpcRegistry.class).
+ withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
+
+ rpcBroker =
+ getContext().actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext).
+ withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
+
+ RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
+ rpcRegistry.tell(localRouter, self());
}
private void startListeners() {
LOG.debug("Registers rpc listeners");
- String rpcBrokerPath = clusterWrapper.getAddress().toString() + "/user/rpc/rpc-broker";
- rpcListener = new RpcListener(rpcRegistry, rpcBrokerPath);
- routeChangeListener = new RoutedRpcListener(rpcRegistry, rpcBrokerPath);
- rpcImplementation = new RemoteRpcImplementation(rpcBroker, schemaContext);
+ rpcListener = new RpcListener(rpcRegistry);
+ routeChangeListener = new RoutedRpcListener(rpcRegistry);
+ rpcImplementation = new RemoteRpcImplementation(rpcBroker, schemaContext, config);
brokerSession.addRpcRegistrationListener(rpcListener);
rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
private void updateSchemaContext(UpdateSchemaContext message) {
this.schemaContext = message.getSchemaContext();
+ rpcBroker.tell(message, ActorRef.noSender());
}
@Override