private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcProvider.class);
- private final ActorSystem actorSystem;
private final RpcProvisionRegistry rpcProvisionRegistry;
+
+ private ActorSystem actorSystem;
private Broker.ProviderSession brokerSession;
private SchemaContext schemaContext;
private ActorRef rpcManager;
+ private RemoteRpcProviderConfig config;
public RemoteRpcProvider(ActorSystem actorSystem, RpcProvisionRegistry rpcProvisionRegistry) {
this.actorSystem = actorSystem;
this.rpcProvisionRegistry = rpcProvisionRegistry;
+ this.config = new RemoteRpcProviderConfig(actorSystem.settings().config());
}
@Override
public void close() throws Exception {
- this.actorSystem.shutdown();
+ if (this.actorSystem != null)
+ this.actorSystem.shutdown();
}
@Override
}
private void start() {
- LOG.info("Starting all rpc listeners and actors.");
- // Create actor to handle and sync routing table in cluster
+ LOG.info("Starting remote rpc service...");
+
SchemaService schemaService = brokerSession.getService(SchemaService.class);
schemaContext = schemaService.getGlobalContext();
- rpcManager = actorSystem.actorOf(RpcManager.props(schemaContext, brokerSession, rpcProvisionRegistry), ActorConstants.RPC_MANAGER);
+ rpcManager = actorSystem.actorOf(RpcManager.props(schemaContext, brokerSession, rpcProvisionRegistry),
+ config.getRpcManagerName());
- LOG.debug("Rpc actors are created.");
+ LOG.debug("rpc manager started");
}
-
@Override
public void onGlobalContextUpdated(SchemaContext schemaContext) {
this.schemaContext = schemaContext;