import akka.japi.Function;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.md.sal.dom.broker.spi.rpc.RpcRoutingStrategy;
import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.Module;
import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
private ActorRef rpcRegistry;
private final RemoteRpcProviderConfig config;
private RpcListener rpcListener;
- private RoutedRpcListener routeChangeListener;
private RemoteRpcImplementation rpcImplementation;
private final DOMRpcProviderService rpcProvisionRegistry;
private final DOMRpcService rpcServices;
LOG.debug("Registers rpc listeners");
rpcListener = new RpcListener(rpcRegistry);
- routeChangeListener = new RoutedRpcListener(rpcRegistry);
rpcImplementation = new RemoteRpcImplementation(rpcBroker, config);
rpcServices.registerRpcListener(rpcListener);
-// rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
-// rpcProvisionRegistry.setRoutedRpcDefaultDelegate(rpcImplementation);
+ registerRoutedRpcDelegate();
announceSupportedRpcs();
}
+ private void registerRoutedRpcDelegate() {
+ Set<DOMRpcIdentifier> rpcIdentifiers = new HashSet<>();
+ Set<Module> modules = schemaContext.getModules();
+ for(Module module : modules){
+ for(RpcDefinition rpcDefinition : module.getRpcs()){
+ if(RpcRoutingStrategy.from(rpcDefinition).isContextBasedRouted()) {
+ LOG.debug("Adding routed rpcDefinition for path {}", rpcDefinition.getPath());
+ rpcIdentifiers.add(DOMRpcIdentifier.create(rpcDefinition.getPath(), YangInstanceIdentifier.EMPTY));
+ }
+ }
+ }
+ rpcProvisionRegistry.registerRpcImplementation(rpcImplementation, rpcIdentifiers);
+ }
+
/**
* Add all the locally registered RPCs in the clustered routing table
*/
private void updateSchemaContext(final UpdateSchemaContext message) {
schemaContext = message.getSchemaContext();
+ registerRoutedRpcDelegate();
rpcBroker.tell(message, ActorRef.noSender());
}
new Function<Throwable, SupervisorStrategy.Directive>() {
@Override
public SupervisorStrategy.Directive apply(final Throwable t) {
+ LOG.error("An exception happened actor will be resumed", t);
+
return SupervisorStrategy.resume();
}
}