- private static final Logger LOG = LoggerFactory.getLogger(RpcManager.class);
-
- private SchemaContext schemaContext;
- private ActorRef rpcBroker;
- private ActorRef rpcRegistry;
- private final Broker.ProviderSession brokerSession;
- private RpcListener rpcListener;
- private RoutedRpcListener routeChangeListener;
- private RemoteRpcImplementation rpcImplementation;
- private final RpcProvisionRegistry rpcProvisionRegistry;
-
- private RpcManager(SchemaContext schemaContext,
- Broker.ProviderSession brokerSession, RpcProvisionRegistry rpcProvisionRegistry) {
- this.schemaContext = schemaContext;
- this.brokerSession = brokerSession;
- this.rpcProvisionRegistry = rpcProvisionRegistry;
-
- createRpcActors();
- startListeners();
- }
-
-
- public static Props props(final SchemaContext schemaContext,
- final Broker.ProviderSession brokerSession, final RpcProvisionRegistry rpcProvisionRegistry) {
- return Props.create(new Creator<RpcManager>() {
- @Override
- public RpcManager create() throws Exception {
- return new RpcManager(schemaContext, brokerSession, rpcProvisionRegistry);
- }
- });
- }
-
- private void createRpcActors() {
- LOG.debug("Create rpc registry and broker actors");
-
-
- rpcRegistry = getContext().actorOf(Props.create(RpcRegistry.class), ActorConstants.RPC_REGISTRY);
-
- rpcBroker = getContext().actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext), ActorConstants.RPC_BROKER);
- RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
- rpcRegistry.tell(localRouter, self());
- }
-
- private void startListeners() {
- LOG.debug("Registers rpc listeners");
-
- rpcListener = new RpcListener(rpcRegistry);
- routeChangeListener = new RoutedRpcListener(rpcRegistry);
- rpcImplementation = new RemoteRpcImplementation(rpcBroker, schemaContext);
-
- brokerSession.addRpcRegistrationListener(rpcListener);
- rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
- rpcProvisionRegistry.setRoutedRpcDefaultDelegate(rpcImplementation);
- announceSupportedRpcs();
- }
-
- /**
- * Add all the locally registered RPCs in the clustered routing table
- */
- private void announceSupportedRpcs(){
- LOG.debug("Adding all supported rpcs to routing table");
- Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
- for (QName rpc : currentlySupported) {
- rpcListener.onRpcImplementationAdded(rpc);
+
+ public static Props props(final SchemaContext schemaContext, final DOMRpcProviderService rpcProvisionRegistry,
+ final DOMRpcService rpcServices, final RemoteRpcProviderConfig config) {
+ Preconditions.checkNotNull(schemaContext, "SchemaContext can not be null!");
+ Preconditions.checkNotNull(rpcProvisionRegistry, "RpcProviderService can not be null!");
+ Preconditions.checkNotNull(rpcServices, "RpcService can not be null!");
+ return Props.create(RpcManager.class, schemaContext, rpcProvisionRegistry, rpcServices, config);