import org.opendaylight.controller.sal.core.api.Provider;
import org.opendaylight.controller.sal.core.api.RpcImplementation;
import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
+import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction;
import org.opendaylight.controller.sal.core.api.notify.NotificationListener;
import org.opendaylight.controller.sal.core.api.notify.NotificationPublishService;
+import org.opendaylight.yangtools.concepts.CompositeObjectRegistration;
+import org.opendaylight.yangtools.concepts.CompositeObjectRegistration.CompositeObjectRegistrationBuilder;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.ObjectRegistration;
import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.binding.Augmentable;
import org.opendaylight.yangtools.yang.binding.Augmentation;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.Node;
-import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
import org.opendaylight.yangtools.yang.data.impl.codec.DeserializationException;
import org.slf4j.Logger;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSet.Builder;
import com.google.common.util.concurrent.Futures;
public void startRpcForwarding() {
if (biRpcRegistry != null && baRpcRegistry instanceof RouteChangePublisher<?, ?>) {
checkState(!rpcForwarding, "Connector is already forwarding RPCs");
- domToBindingRpcManager = baRpcRegistry.registerRouteChangeListener(new DomToBindingRpcForwardingManager());
+ final DomToBindingRpcForwardingManager biFwdManager = new DomToBindingRpcForwardingManager();
+
+ domToBindingRpcManager = baRpcRegistry.registerRouteChangeListener(biFwdManager);
+ biRpcRegistry.addRpcRegistrationListener(biFwdManager);
if (baRpcRegistry instanceof RpcProviderRegistryImpl) {
baRpcRegistryImpl = (RpcProviderRegistryImpl) baRpcRegistry;
baRpcRegistryImpl.registerRouterInstantiationListener(domToBindingRpcManager.getInstance());
public void onRegister(
final DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject> registration) {
- org.opendaylight.yangtools.yang.data.api.InstanceIdentifier domPath = mappingService.toDataDom(registration
+ mappingService.toDataDom(registration
.getPath());
}
*/
private class DomToBindingRpcForwardingManager implements
RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>, RouterInstantiationListener,
- GlobalRpcRegistrationListener {
+ GlobalRpcRegistrationListener, RpcRegistrationListener {
private final Map<Class<? extends RpcService>, DomToBindingRpcForwarder> forwarders = new WeakHashMap<>();
private RpcProviderRegistryImpl registryImpl;
@Override
public void onGlobalRpcRegistered(final Class<? extends RpcService> cls) {
- getRpcForwarder(cls, null);
+ getRpcForwarder(cls, null).registerToDOMBroker();
}
@Override
return potential;
}
+ @Override
+ public void onRpcImplementationAdded(final QName name) {
+
+ final Optional<Class<? extends RpcService>> rpcInterface = mappingService.getRpcServiceClassFor(
+ name.getNamespace().toString(), name.getFormattedRevision());
+ if (rpcInterface.isPresent()) {
+ getRpcForwarder(rpcInterface.get(), null).registerToBindingBroker();
+ }
+ }
+
+ @Override
+ public void onRpcImplementationRemoved(final QName name) {
+
+ }
}
private class DomToBindingRpcForwarder implements RpcImplementation, InvocationHandler {
private final Set<QName> supportedRpcs;
private final WeakReference<Class<? extends RpcService>> rpcServiceType;
- private final Set<org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration> registrations;
+ private Set<org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration> registrations;
private final Map<QName, RpcInvocationStrategy> strategiesByQName = new HashMap<>();
private final WeakHashMap<Method, RpcInvocationStrategy> strategiesByMethod = new WeakHashMap<>();
+ private final RpcService proxy;
+ private ObjectRegistration<?> forwarderRegistration;
+ private boolean registrationInProgress = false;
public DomToBindingRpcForwarder(final Class<? extends RpcService> service) {
this.rpcServiceType = new WeakReference<Class<? extends RpcService>>(service);
this.supportedRpcs = mappingService.getRpcQNamesFor(service);
- try {
- for (QName rpc : supportedRpcs) {
- RpcInvocationStrategy strategy = createInvocationStrategy(rpc, service);
- strategiesByMethod.put(strategy.targetMethod, strategy);
- strategiesByQName.put(rpc, strategy);
- biRpcRegistry.addRpcImplementation(rpc, this);
- }
- } catch (Exception e) {
- LOG.error("Could not forward Rpcs of type {}", service.getName(), e);
- }
- registrations = ImmutableSet.of();
+ Class<?> cls = rpcServiceType.get();
+ ClassLoader clsLoader = cls.getClassLoader();
+ proxy =(RpcService) Proxy.newProxyInstance(clsLoader, new Class<?>[] { cls }, this);
+ createStrategies();
}
/**
* @param context
*/
public DomToBindingRpcForwarder(final Class<? extends RpcService> service,
- final Class<? extends BaseIdentity> context) {
- this.rpcServiceType = new WeakReference<Class<? extends RpcService>>(service);
- this.supportedRpcs = mappingService.getRpcQNamesFor(service);
+ final Class<? extends BaseIdentity> context) {
+ this(service);
Builder<RoutedRpcRegistration> registrationsBuilder = ImmutableSet
.<org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration> builder();
try {
for (QName rpc : supportedRpcs) {
- RpcInvocationStrategy strategy = createInvocationStrategy(rpc, service);
- strategiesByMethod.put(strategy.targetMethod, strategy);
- strategiesByQName.put(rpc, strategy);
registrationsBuilder.add(biRpcRegistry.addRoutedRpcImplementation(rpc, this));
}
createDefaultDomForwarder();
registrations = registrationsBuilder.build();
}
+
+
+ private void createStrategies() {
+ try {
+ for (QName rpc : supportedRpcs) {
+ RpcInvocationStrategy strategy = createInvocationStrategy(rpc, rpcServiceType.get());
+ strategiesByMethod.put(strategy.targetMethod, strategy);
+ strategiesByQName.put(rpc, strategy);
+ }
+ } catch (Exception e) {
+ LOG.error("Could not forward Rpcs of type {}", rpcServiceType.get(), e);
+ }
+
+ }
+
+ /**
+ * Registers RPC Forwarder to DOM Broker,
+ * this means Binding Aware Broker has implementation of RPC
+ * which is registered to it.
+ *
+ * If RPC Forwarder was previously registered to DOM Broker
+ * or to Bidning Broker this method is noop to prevent
+ * creating forwarding loop.
+ *
+ */
+ public void registerToDOMBroker() {
+ if(!registrationInProgress && forwarderRegistration == null) {
+ registrationInProgress = true;
+ CompositeObjectRegistrationBuilder<DomToBindingRpcForwarder> builder = CompositeObjectRegistration.builderFor(this);
+ try {
+ for (QName rpc : supportedRpcs) {
+ builder.add(biRpcRegistry.addRpcImplementation(rpc, this));
+ }
+ } catch (Exception e) {
+ LOG.error("Could not forward Rpcs of type {}", rpcServiceType.get(), e);
+ }
+ this.forwarderRegistration = builder.toInstance();
+ registrationInProgress = false;
+ }
+ }
+
+
public void registerPaths(final Class<? extends BaseIdentity> context,
final Class<? extends RpcService> service, final Set<InstanceIdentifier<?>> set) {
QName ctx = BindingReflections.findQName(context);
}
}
checkState(targetMethod != null, "Rpc method not found");
- Optional<Class<?>> outputClass = BindingReflections.resolveRpcOutputClass(targetMethod);
- Optional<Class<? extends DataContainer>> inputClass = BindingReflections
- .resolveRpcInputClass(targetMethod);
-
- RpcInvocationStrategy strategy = null;
- if (outputClass.isPresent()) {
- if (inputClass.isPresent()) {
- strategy = new DefaultInvocationStrategy(rpc, targetMethod, outputClass.get(), inputClass
- .get());
- } else {
- strategy = new NoInputInvocationStrategy(rpc, targetMethod, outputClass.get());
- }
- } else if (inputClass.isPresent()) {
- strategy = new NoOutputInvocationStrategy(rpc, targetMethod, inputClass.get());
- } else {
- strategy = new NoInputNoOutputInvocationStrategy(rpc, targetMethod);
- }
- return strategy;
+ return new RpcInvocationStrategy(rpc,targetMethod, mappingService, biRpcRegistry);
}
});
}
- }
-
- private abstract class RpcInvocationStrategy {
-
- protected final Method targetMethod;
- protected final QName rpc;
-
- public RpcInvocationStrategy(final QName rpc, final Method targetMethod) {
- this.targetMethod = targetMethod;
- this.rpc = rpc;
- }
-
- public abstract Future<RpcResult<?>> forwardToDomBroker(DataObject input);
-
- public abstract RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput)
- throws Exception;
-
- public RpcResult<CompositeNode> invokeOn(final RpcService rpcService, final CompositeNode domInput)
- throws Exception {
- return uncheckedInvoke(rpcService, domInput);
- }
- }
-
- private class DefaultInvocationStrategy extends RpcInvocationStrategy {
-
- @SuppressWarnings("rawtypes")
- private final WeakReference<Class> inputClass;
-
- @SuppressWarnings("rawtypes")
- private final WeakReference<Class> outputClass;
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public DefaultInvocationStrategy(final QName rpc, final Method targetMethod, final Class<?> outputClass,
- final Class<? extends DataContainer> inputClass) {
- super(rpc, targetMethod);
- this.outputClass = new WeakReference(outputClass);
- this.inputClass = new WeakReference(inputClass);
- }
- @SuppressWarnings("unchecked")
- @Override
- public RpcResult<CompositeNode> uncheckedInvoke(final RpcService rpcService, final CompositeNode domInput)
- throws Exception {
- DataContainer bindingInput = mappingService.dataObjectFromDataDom(inputClass.get(), domInput);
- Future<RpcResult<?>> futureResult = (Future<RpcResult<?>>) targetMethod.invoke(rpcService, bindingInput);
- if (futureResult == null) {
- return Rpcs.getRpcResult(false);
- }
- RpcResult<?> bindingResult = futureResult.get();
- final Object resultObj = bindingResult.getResult();
- if (resultObj instanceof DataObject) {
- final CompositeNode output = mappingService.toDataDom((DataObject) resultObj);
- return Rpcs.getRpcResult(true, output, Collections.<RpcError> emptySet());
- }
- return Rpcs.getRpcResult(true);
- }
-
- @Override
- public ListenableFuture<RpcResult<?>> forwardToDomBroker(final DataObject input) {
- if (biRpcRegistry == null) {
- return Futures.<RpcResult<?>> immediateFuture(Rpcs.getRpcResult(false));
- }
-
- CompositeNode xml = mappingService.toDataDom(input);
- CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc, ImmutableList.<Node<?>> of(xml));
-
- return Futures.transform(biRpcRegistry.invokeRpc(rpc, wrappedXml),
- new Function<RpcResult<CompositeNode>, RpcResult<?>>() {
- @Override
- public RpcResult<?> apply(final RpcResult<CompositeNode> input) {
- Object baResultValue = null;
- if (input.getResult() != null) {
- baResultValue = mappingService.dataObjectFromDataDom(outputClass.get(),
- input.getResult());
- }
- return Rpcs.getRpcResult(input.isSuccessful(), baResultValue, input.getErrors());
- }
- });
- }
- }
-
- private class NoInputInvocationStrategy extends RpcInvocationStrategy {
-
- @SuppressWarnings("rawtypes")
- private final WeakReference<Class> outputClass;
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public NoInputInvocationStrategy(final QName rpc, final Method targetMethod, final Class<?> outputClass) {
- super(rpc, targetMethod);
- this.outputClass = new WeakReference(outputClass);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public RpcResult<CompositeNode> uncheckedInvoke(final RpcService rpcService, final CompositeNode domInput)
- throws Exception {
- Future<RpcResult<?>> futureResult = (Future<RpcResult<?>>) targetMethod.invoke(rpcService);
- if (futureResult == null) {
- return Rpcs.getRpcResult(false);
- }
- RpcResult<?> bindingResult = futureResult.get();
- final Object resultObj = bindingResult.getResult();
- if (resultObj instanceof DataObject) {
- final CompositeNode output = mappingService.toDataDom((DataObject) resultObj);
- return Rpcs.getRpcResult(true, output, Collections.<RpcError> emptySet());
- }
- return Rpcs.getRpcResult(true);
- }
-
- @Override
- public Future<RpcResult<?>> forwardToDomBroker(final DataObject input) {
- if (biRpcRegistry != null) {
- CompositeNode xml = mappingService.toDataDom(input);
- CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc, ImmutableList.<Node<?>> of(xml));
- return Futures.transform(biRpcRegistry.invokeRpc(rpc, wrappedXml),
- new Function<RpcResult<CompositeNode>, RpcResult<?>>() {
- @Override
- public RpcResult<?> apply(final RpcResult<CompositeNode> input) {
- Object baResultValue = null;
- if (input.getResult() != null) {
- baResultValue = mappingService.dataObjectFromDataDom(outputClass.get(),
- input.getResult());
- }
- return Rpcs.getRpcResult(input.isSuccessful(), baResultValue, input.getErrors());
- }
- });
- } else {
- return Futures.<RpcResult<?>> immediateFuture(Rpcs.getRpcResult(false));
- }
- }
- }
-
- private class NoInputNoOutputInvocationStrategy extends RpcInvocationStrategy {
-
- public NoInputNoOutputInvocationStrategy(final QName rpc, final Method targetMethod) {
- super(rpc, targetMethod);
- }
-
- @Override
- public RpcResult<CompositeNode> uncheckedInvoke(final RpcService rpcService, final CompositeNode domInput)
- throws Exception {
- @SuppressWarnings("unchecked")
- Future<RpcResult<Void>> result = (Future<RpcResult<Void>>) targetMethod.invoke(rpcService);
- RpcResult<Void> bindingResult = result.get();
- return Rpcs.getRpcResult(bindingResult.isSuccessful(), bindingResult.getErrors());
- }
-
- @Override
- public Future<RpcResult<?>> forwardToDomBroker(final DataObject input) {
- return Futures.immediateFuture(null);
- }
- }
-
- private class NoOutputInvocationStrategy extends RpcInvocationStrategy {
-
- @SuppressWarnings("rawtypes")
- private final WeakReference<Class> inputClass;
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public NoOutputInvocationStrategy(final QName rpc, final Method targetMethod,
- final Class<? extends DataContainer> inputClass) {
- super(rpc, targetMethod);
- this.inputClass = new WeakReference(inputClass);
- }
-
- @Override
- public RpcResult<CompositeNode> uncheckedInvoke(final RpcService rpcService, final CompositeNode domInput)
- throws Exception {
- DataContainer bindingInput = mappingService.dataObjectFromDataDom(inputClass.get(), domInput);
- Future<RpcResult<?>> result = (Future<RpcResult<?>>) targetMethod.invoke(rpcService, bindingInput);
- if (result == null) {
- return Rpcs.getRpcResult(false);
- }
- RpcResult<?> bindingResult = result.get();
- return Rpcs.getRpcResult(true);
- }
-
- @Override
- public ListenableFuture<RpcResult<?>> forwardToDomBroker(final DataObject input) {
- if (biRpcRegistry == null) {
- return Futures.<RpcResult<?>> immediateFuture(Rpcs.getRpcResult(false));
+ /**
+ * Registers RPC Forwarder to Binding Broker,
+ * this means DOM Broekr has implementation of RPC
+ * which is registered to it.
+ *
+ * If RPC Forwarder was previously registered to DOM Broker
+ * or to Bidning Broker this method is noop to prevent
+ * creating forwarding loop.
+ *
+ */
+ public void registerToBindingBroker() {
+ if(!registrationInProgress && forwarderRegistration == null) {
+ try {
+ registrationInProgress = true;
+ this.forwarderRegistration = baRpcRegistry.addRpcImplementation((Class)rpcServiceType.get(), proxy);
+ } catch (Exception e) {
+ LOG.error("Unable to forward RPCs for {}",rpcServiceType.get(),e);
+ } finally {
+ registrationInProgress = false;
+ }
}
-
- CompositeNode xml = mappingService.toDataDom(input);
- CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc, ImmutableList.<Node<?>> of(xml));
-
- return Futures.transform(biRpcRegistry.invokeRpc(rpc, wrappedXml),
- new Function<RpcResult<CompositeNode>, RpcResult<?>>() {
- @Override
- public RpcResult<?> apply(final RpcResult<CompositeNode> input) {
- return Rpcs.<Void> getRpcResult(input.isSuccessful(), null, input.getErrors());
- }
- });
}
}