package org.opendaylight.controller.remote.rpc;
-
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
-import org.opendaylight.controller.remote.rpc.registry.ClusterWrapper;
-import org.opendaylight.controller.remote.rpc.registry.ClusterWrapperImpl;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
+import akka.actor.PoisonPill;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import java.util.Collection;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.controller.sal.core.api.Broker;
import org.opendaylight.controller.sal.core.api.Provider;
-import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collection;
-import java.util.Set;
-
/**
* This is the base class which initialize all the actors, listeners and
* default RPc implementation so remote invocation of rpcs.
*/
-public class RemoteRpcProvider implements AutoCloseable, Provider{
-
- private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcProvider.class);
-
- private final ActorSystem actorSystem;
- private ActorRef rpcBroker;
- private ActorRef rpcRegistry;
- private final RpcProvisionRegistry rpcProvisionRegistry;
- private Broker.ProviderSession brokerSession;
- private RpcListener rpcListener;
- private RoutedRpcListener routeChangeListener;
- private RemoteRpcImplementation rpcImplementation;
- public RemoteRpcProvider(ActorSystem actorSystem, RpcProvisionRegistry rpcProvisionRegistry) {
- this.actorSystem = actorSystem;
- this.rpcProvisionRegistry = rpcProvisionRegistry;
- }
-
- @Override
- public void close() throws Exception {
- this.actorSystem.shutdown();
- unregisterSupportedRpcs();
- unregisterSupportedRoutedRpcs();
- }
-
- @Override
- public void onSessionInitiated(Broker.ProviderSession session) {
- this.brokerSession = session;
- start();
- }
+public class RemoteRpcProvider implements AutoCloseable, Provider {
- @Override
- public Collection<ProviderFunctionality> getProviderFunctionality() {
- return null;
- }
+ private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcProvider.class);
- private void start() {
- LOG.debug("Starting all rpc listeners.");
- // Create actor to handle and sync routing table in cluster
- ClusterWrapper clusterWrapper = new ClusterWrapperImpl(actorSystem);
- rpcRegistry = actorSystem.actorOf(RpcRegistry.props(clusterWrapper), "rpc-registry");
+ private final DOMRpcProviderService rpcProvisionRegistry;
+ private final RemoteRpcProviderConfig config;
+ private final ActorSystem actorSystem;
- // Create actor to invoke and execute rpc
- SchemaService schemaService = brokerSession.getService(SchemaService.class);
- SchemaContext schemaContext = schemaService.getGlobalContext();
- rpcBroker = actorSystem.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext), "rpc-broker");
- String rpcBrokerPath = clusterWrapper.getAddress().toString() + "/user/rpc-broker";
- rpcListener = new RpcListener(rpcRegistry, rpcBrokerPath);
- routeChangeListener = new RoutedRpcListener(rpcRegistry, rpcBrokerPath);
- rpcImplementation = new RemoteRpcImplementation(rpcBroker, schemaContext);
- brokerSession.addRpcRegistrationListener(rpcListener);
- rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
- rpcProvisionRegistry.setRoutedRpcDefaultDelegate(rpcImplementation);
- announceSupportedRpcs();
- announceSupportedRoutedRpcs();
+ private DOMRpcService rpcService;
+ private SchemaService schemaService;
+ private ActorRef rpcManager;
- }
-
- /**
- * Add all the locally registered RPCs in the clustered routing table
- */
- private void announceSupportedRpcs(){
- LOG.debug("Adding all supported rpcs to routing table");
- Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
- for (QName rpc : currentlySupported) {
- rpcListener.onRpcImplementationAdded(rpc);
+ public RemoteRpcProvider(final ActorSystem actorSystem, final DOMRpcProviderService rpcProvisionRegistry,
+ final RemoteRpcProviderConfig config) {
+ this.actorSystem = actorSystem;
+ this.rpcProvisionRegistry = rpcProvisionRegistry;
+ this.config = Preconditions.checkNotNull(config);
}
- }
-
- /**
- * Add all the locally registered Routed RPCs in the clustered routing table
- */
- private void announceSupportedRoutedRpcs(){
- //TODO: announce all routed RPCs as well
+ public void setRpcService(final DOMRpcService rpcService) {
+ this.rpcService = rpcService;
+ }
- }
+ public void setSchemaService(final SchemaService schemaService) {
+ this.schemaService = schemaService;
+ }
- /**
- * Un-Register all the supported RPCs from clustered routing table
- */
- private void unregisterSupportedRpcs(){
- LOG.debug("removing all supported rpcs to routing table");
- Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
- for (QName rpc : currentlySupported) {
- rpcListener.onRpcImplementationRemoved(rpc);
+ @Override
+ public void close() {
+ if (rpcManager != null) {
+ LOG.info("Stopping RPC Manager at {}", rpcManager);
+ rpcManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ rpcManager = null;
+ }
}
- }
- /**
- * Un-Register all the locally supported Routed RPCs from clustered routing table
- */
- private void unregisterSupportedRoutedRpcs(){
+ @Override
+ public void onSessionInitiated(final Broker.ProviderSession session) {
+ rpcService = session.getService(DOMRpcService.class);
+ start();
+ }
- //TODO: remove all routed RPCs as well
+ @Override
+ public Collection<ProviderFunctionality> getProviderFunctionality() {
+ return ImmutableSet.of();
+ }
- }
+ public void start() {
+ LOG.info("Starting Remote RPC service...");
+ rpcManager = actorSystem.actorOf(RpcManager.props(rpcProvisionRegistry, rpcService, config),
+ config.getRpcManagerName());
+ LOG.debug("RPC Manager started at {}", rpcManager);
+ }
}