package org.opendaylight.controller.remote.rpc;
-
import akka.actor.ActorRef;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
-import akka.actor.SupervisorStrategy.Directive;
-import akka.japi.Function;
import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
-import org.opendaylight.controller.md.sal.dom.broker.spi.rpc.RpcRoutingStrategy;
-import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.model.api.Module;
-import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
import scala.concurrent.duration.Duration;
/**
- * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown. It also starts
- * the rpc listeners
+ * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown. It also registers
+ * {@link RpcListener} with the local {@link DOMRpcService}.
*/
-
public class RpcManager extends AbstractUntypedActor {
- private SchemaContext schemaContext;
- private ActorRef rpcBroker;
- private ActorRef rpcRegistry;
- private final RemoteRpcProviderConfig config;
- private RpcListener rpcListener;
- private RemoteRpcImplementation rpcImplementation;
private final DOMRpcProviderService rpcProvisionRegistry;
+ private final RemoteRpcProviderConfig config;
private final DOMRpcService rpcServices;
- private RpcManager(final SchemaContext schemaContext,
- final DOMRpcProviderService rpcProvisionRegistry,
- final DOMRpcService rpcSevices,
- final RemoteRpcProviderConfig config) {
- this.schemaContext = schemaContext;
- this.rpcProvisionRegistry = rpcProvisionRegistry;
- rpcServices = rpcSevices;
- this.config = config;
+ private ListenerRegistration<RpcListener> listenerReg;
+ private ActorRef rpcInvoker;
+ private ActorRef rpcRegistry;
+ private ActorRef rpcRegistrar;
- createRpcActors();
- startListeners();
+ private RpcManager(final DOMRpcProviderService rpcProvisionRegistry,
+ final DOMRpcService rpcServices,
+ final RemoteRpcProviderConfig config) {
+ this.rpcProvisionRegistry = Preconditions.checkNotNull(rpcProvisionRegistry);
+ this.rpcServices = Preconditions.checkNotNull(rpcServices);
+ this.config = Preconditions.checkNotNull(config);
}
-
- public static Props props(final SchemaContext schemaContext, final DOMRpcProviderService rpcProvisionRegistry,
- final DOMRpcService rpcServices, final RemoteRpcProviderConfig config) {
- Preconditions.checkNotNull(schemaContext, "SchemaContext can not be null!");
+ public static Props props(final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices,
+ final RemoteRpcProviderConfig config) {
Preconditions.checkNotNull(rpcProvisionRegistry, "RpcProviderService can not be null!");
Preconditions.checkNotNull(rpcServices, "RpcService can not be null!");
- return Props.create(RpcManager.class, schemaContext, rpcProvisionRegistry, rpcServices, config);
- }
-
- private void createRpcActors() {
- LOG.debug("Create rpc registry and broker actors");
-
- rpcRegistry = getContext().actorOf(RpcRegistry.props(config)
- .withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
-
- rpcBroker = getContext().actorOf(RpcBroker.props(rpcServices)
- .withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
-
- final RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
- rpcRegistry.tell(localRouter, self());
+ Preconditions.checkNotNull(config, "RemoteRpcProviderConfig can not be null!");
+ return Props.create(RpcManager.class, rpcProvisionRegistry, rpcServices, config);
}
- private void startListeners() {
- LOG.debug("Registers rpc listeners");
+ @Override
+ public void preStart() throws Exception {
+ super.preStart();
- rpcListener = new RpcListener(rpcRegistry);
- rpcImplementation = new RemoteRpcImplementation(rpcRegistry, config);
+ rpcInvoker = getContext().actorOf(RpcInvoker.props(rpcServices)
+ .withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
+ LOG.debug("Listening for RPC invocation requests with {}", rpcInvoker);
- rpcServices.registerRpcListener(rpcListener);
+ rpcRegistrar = getContext().actorOf(RpcRegistrar.props(config, rpcProvisionRegistry)
+ .withMailbox(config.getMailBoxName()), config.getRpcRegistrarName());
+ LOG.debug("Registering remote RPCs with {}", rpcRegistrar);
- registerRoutedRpcDelegate();
- announceSupportedRpcs();
- }
+ rpcRegistry = getContext().actorOf(RpcRegistry.props(config, rpcInvoker, rpcRegistrar)
+ .withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
+ LOG.debug("Propagating RPC information with {}", rpcRegistry);
- private void registerRoutedRpcDelegate() {
- final Set<DOMRpcIdentifier> rpcIdentifiers = new HashSet<>();
- final Set<Module> modules = schemaContext.getModules();
- for (final Module module : modules) {
- for (final RpcDefinition rpcDefinition : module.getRpcs()) {
- if (RpcRoutingStrategy.from(rpcDefinition).isContextBasedRouted()) {
- LOG.debug("Adding routed rpcDefinition for path {}", rpcDefinition.getPath());
- rpcIdentifiers.add(DOMRpcIdentifier.create(rpcDefinition.getPath(), YangInstanceIdentifier.EMPTY));
- }
- }
- }
- rpcProvisionRegistry.registerRpcImplementation(rpcImplementation, rpcIdentifiers);
+ final RpcListener rpcListener = new RpcListener(rpcRegistry);
+ LOG.debug("Registering local availabitility listener {}", rpcListener);
+ listenerReg = rpcServices.registerRpcListener(rpcListener);
}
- /**
- * Add all the locally registered RPCs in the clustered routing table.
- */
- private void announceSupportedRpcs() {
- LOG.debug("Adding all supported rpcs to routing table");
- final Set<RpcDefinition> currentlySupportedRpc = schemaContext.getOperations();
- final List<DOMRpcIdentifier> rpcs = new ArrayList<>();
- for (final RpcDefinition rpcDef : currentlySupportedRpc) {
- rpcs.add(DOMRpcIdentifier.create(rpcDef.getPath()));
+ @Override
+ public void postStop() throws Exception {
+ if (listenerReg != null) {
+ listenerReg.close();
+ listenerReg = null;
}
- if (!rpcs.isEmpty()) {
- rpcListener.onRpcAvailable(rpcs);
- }
+ super.postStop();
}
-
@Override
- protected void handleReceive(final Object message) throws Exception {
- if (message instanceof UpdateSchemaContext) {
- updateSchemaContext((UpdateSchemaContext) message);
- }
- }
-
- private void updateSchemaContext(final UpdateSchemaContext message) {
- schemaContext = message.getSchemaContext();
- registerRoutedRpcDelegate();
- rpcBroker.tell(message, ActorRef.noSender());
+ protected void handleReceive(final Object message) {
+ unknownMessage(message);
}
@Override
public SupervisorStrategy supervisorStrategy() {
- return new OneForOneStrategy(10, Duration.create("1 minute"), (Function<Throwable, Directive>) t -> {
+ return new OneForOneStrategy(10, Duration.create("1 minute"), t -> {
LOG.error("An exception happened actor will be resumed", t);
-
return SupervisorStrategy.resume();
});
}