import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
import org.opendaylight.controller.md.sal.common.api.data.DataModification;
-import org.opendaylight.controller.md.sal.common.api.data.DataReader;
import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
import org.opendaylight.controller.md.sal.common.api.routing.RouteChangePublisher;
import org.opendaylight.controller.sal.binding.api.data.RuntimeDataProvider;
import org.opendaylight.controller.sal.binding.api.rpc.RpcContextIdentifier;
import org.opendaylight.controller.sal.binding.api.rpc.RpcRouter;
+import org.opendaylight.controller.sal.binding.impl.DataBrokerImpl;
+import org.opendaylight.controller.sal.binding.impl.MountPointManagerImpl.BindingMountPointImpl;
import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl;
import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.GlobalRpcRegistrationListener;
import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.RouterInstantiationListener;
import org.opendaylight.controller.sal.core.api.notify.NotificationPublishService;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.concepts.Registration;
-import org.opendaylight.yangtools.concepts.util.ClassLoaderUtils;
import org.opendaylight.yangtools.yang.binding.Augmentable;
import org.opendaylight.yangtools.yang.binding.Augmentation;
import org.opendaylight.yangtools.yang.binding.BaseIdentity;
import org.opendaylight.yangtools.yang.binding.Notification;
import org.opendaylight.yangtools.yang.binding.RpcService;
import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
+import org.opendaylight.yangtools.yang.binding.util.ClassLoaderUtils;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSet.Builder;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
public class BindingIndependentConnector implements //
RuntimeDataProvider, //
Provider, //
AutoCloseable {
-
-
private final Logger LOG = LoggerFactory.getLogger(BindingIndependentConnector.class);
- @SuppressWarnings("deprecation")
- private static final InstanceIdentifier<? extends DataObject> ROOT = InstanceIdentifier.builder().toInstance();
-
private static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier ROOT_BI = org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
.builder().toInstance();
};
- private Registration<DataReader<InstanceIdentifier<? extends DataObject>, DataObject>> baDataReaderRegistration;
-
private boolean rpcForwarding = false;
private boolean dataForwarding = false;
private DataModificationTransaction createBindingToDomTransaction(
final DataModification<InstanceIdentifier<? extends DataObject>, DataObject> source) {
DataModificationTransaction target = biDataService.beginTransaction();
- LOG.debug("Created DOM Transaction {} for {},", target.getIdentifier(),source.getIdentifier());
+ LOG.debug("Created DOM Transaction {} for {},", target.getIdentifier(), source.getIdentifier());
for (InstanceIdentifier<? extends DataObject> entry : source.getRemovedConfigurationData()) {
org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biEntry = mappingService.toDataDom(entry);
target.removeConfigurationData(biEntry);
- LOG.debug("Delete of Binding Configuration Data {} is translated to {}",entry,biEntry);
+ LOG.debug("Delete of Binding Configuration Data {} is translated to {}", entry, biEntry);
}
for (InstanceIdentifier<? extends DataObject> entry : source.getRemovedOperationalData()) {
org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biEntry = mappingService.toDataDom(entry);
target.removeOperationalData(biEntry);
- LOG.debug("Delete of Binding Operational Data {} is translated to {}",entry,biEntry);
+ LOG.debug("Delete of Binding Operational Data {} is translated to {}", entry, biEntry);
}
for (Entry<InstanceIdentifier<? extends DataObject>, DataObject> entry : source.getUpdatedConfigurationData()
.entrySet()) {
Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> biEntry = mappingService
.toDataDom(entry);
target.putConfigurationData(biEntry.getKey(), biEntry.getValue());
- LOG.debug("Update of Binding Configuration Data {} is translated to {}",entry,biEntry);
+ LOG.debug("Update of Binding Configuration Data {} is translated to {}", entry, biEntry);
}
for (Entry<InstanceIdentifier<? extends DataObject>, DataObject> entry : source.getUpdatedOperationalData()
.entrySet()) {
Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> biEntry = mappingService
.toDataDom(entry);
target.putOperationalData(biEntry.getKey(), biEntry.getValue());
- LOG.debug("Update of Binding Operational Data {} is translated to {}",entry,biEntry);
+ LOG.debug("Update of Binding Operational Data {} is translated to {}", entry, biEntry);
}
return target;
return biDataService;
}
- protected void setDomDataService(final org.opendaylight.controller.sal.core.api.data.DataProviderService biDataService) {
+ protected void setDomDataService(
+ final org.opendaylight.controller.sal.core.api.data.DataProviderService biDataService) {
this.biDataService = biDataService;
}
}
public void startDataForwarding() {
- if(baDataService instanceof AbstractForwardedDataBroker) {
+ if (baDataService instanceof AbstractForwardedDataBroker) {
dataForwarding = true;
return;
}
- checkState(!dataForwarding, "Connector is already forwarding data.");
- baDataReaderRegistration = baDataService.registerDataReader(ROOT, this);
- baCommitHandlerRegistration = baDataService.registerCommitHandler(ROOT, bindingToDomCommitHandler);
- biCommitHandlerRegistration = biDataService.registerCommitHandler(ROOT_BI, domToBindingCommitHandler);
- baDataService.registerCommitHandlerListener(domToBindingCommitHandler);
+
+ final DataProviderService baData;
+ if (baDataService instanceof BindingMountPointImpl) {
+ baData = ((BindingMountPointImpl) baDataService).getDataBrokerImpl();
+ LOG.debug("Extracted BA Data provider {} from mount point {}", baData, baDataService);
+ } else {
+ baData = baDataService;
+ }
+
+ if (baData instanceof DataBrokerImpl) {
+ checkState(!dataForwarding, "Connector is already forwarding data.");
+ ((DataBrokerImpl) baData).setDataReadDelegate(this);
+ ((DataBrokerImpl) baData).setRootCommitHandler(bindingToDomCommitHandler);
+ biCommitHandlerRegistration = biDataService.registerCommitHandler(ROOT_BI, domToBindingCommitHandler);
+ baDataService.registerCommitHandlerListener(domToBindingCommitHandler);
+ }
+
dataForwarding = true;
}
public void startRpcForwarding() {
- if (baRpcRegistry != null && biRpcRegistry != null && baRpcRegistry instanceof RouteChangePublisher<?, ?>) {
+ if (biRpcRegistry != null && baRpcRegistry instanceof RouteChangePublisher<?, ?>) {
checkState(!rpcForwarding, "Connector is already forwarding RPCs");
domToBindingRpcManager = baRpcRegistry.registerRouteChangeListener(new DomToBindingRpcForwardingManager());
if (baRpcRegistry instanceof RpcProviderRegistryImpl) {
}
DataModificationTransaction domTransaction = createBindingToDomTransaction(bindingTransaction);
BindingToDomTransaction wrapped = new BindingToDomTransaction(domTransaction, bindingTransaction);
- LOG.trace("Forwarding Binding Transaction: {} as DOM Transaction: {} .", bindingTransaction.getIdentifier(),
- domTransaction.getIdentifier());
+ LOG.trace("Forwarding Binding Transaction: {} as DOM Transaction: {} .",
+ bindingTransaction.getIdentifier(), domTransaction.getIdentifier());
return wrapped;
}
}
DataCommitHandler<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> {
@Override
- public void onRegister(final DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject> registration) {
+ public void onRegister(
+ final DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject> registration) {
org.opendaylight.yangtools.yang.data.api.InstanceIdentifier domPath = mappingService.toDataDom(registration
.getPath());
}
@Override
- public void onUnregister(final DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject> registration) {
+ public void onUnregister(
+ final DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject> registration) {
// NOOP for now
// FIXME: do registration based on only active commit handlers.
}
*
*/
private class DomToBindingRpcForwardingManager implements
- RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>,
- RouterInstantiationListener,
+ RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>, RouterInstantiationListener,
GlobalRpcRegistrationListener {
private final Map<Class<? extends RpcService>, DomToBindingRpcForwarder> forwarders = new WeakHashMap<>();
}
} catch (Exception e) {
- LOG.error("Could not forward Rpcs of type {}", service.getName(),e);
+ LOG.error("Could not forward Rpcs of type {}", service.getName(), e);
}
registrations = ImmutableSet.of();
}
* @param service
* @param context
*/
- public DomToBindingRpcForwarder(final Class<? extends RpcService> service, final Class<? extends BaseIdentity> context) {
+ public DomToBindingRpcForwarder(final Class<? extends RpcService> service,
+ final Class<? extends BaseIdentity> context) {
this.rpcServiceType = new WeakReference<Class<? extends RpcService>>(service);
this.supportedRpcs = mappingService.getRpcQNamesFor(service);
Builder<RoutedRpcRegistration> registrationsBuilder = ImmutableSet
registrations = registrationsBuilder.build();
}
- public void registerPaths(final Class<? extends BaseIdentity> context, final Class<? extends RpcService> service,
- final Set<InstanceIdentifier<?>> set) {
+ public void registerPaths(final Class<? extends BaseIdentity> context,
+ final Class<? extends RpcService> service, final Set<InstanceIdentifier<?>> set) {
QName ctx = BindingReflections.findQName(context);
for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier path : FluentIterable.from(set).transform(
toDOMInstanceIdentifier)) {
}
}
-
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
if (EQUALS_METHOD.equals(method)) {
}
@Override
- public RpcResult<CompositeNode> invokeRpc(final QName rpc, final CompositeNode domInput) {
+ public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(final QName rpc, final CompositeNode domInput) {
checkArgument(rpc != null);
checkArgument(domInput != null);
RpcService rpcService = baRpcRegistry.getRpcService(rpcType);
checkState(rpcService != null);
CompositeNode domUnwrappedInput = domInput.getFirstCompositeByName(QName.create(rpc, "input"));
+
try {
- return resolveInvocationStrategy(rpc).invokeOn(rpcService, domUnwrappedInput);
+ return Futures.immediateFuture(resolveInvocationStrategy(rpc).invokeOn(rpcService, domUnwrappedInput));
} catch (Exception e) {
- throw new IllegalStateException(e);
+ return Futures.immediateFailedFuture(e);
}
}
strategy = new DefaultInvocationStrategy(rpc, targetMethod, outputClass.get(), inputClass
.get());
} else {
- strategy = new NoInputNoOutputInvocationStrategy(rpc, targetMethod);
+ strategy = new NoInputInvocationStrategy(rpc, targetMethod, outputClass.get());
}
- } else if(inputClass.isPresent()){
- strategy = new NoOutputInvocationStrategy(rpc,targetMethod, inputClass.get());
+ } else if (inputClass.isPresent()) {
+ strategy = new NoOutputInvocationStrategy(rpc, targetMethod, inputClass.get());
} else {
- strategy = new NoInputNoOutputInvocationStrategy(rpc,targetMethod);
+ strategy = new NoInputNoOutputInvocationStrategy(rpc, targetMethod);
}
return strategy;
}
public abstract RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput)
throws Exception;
- public RpcResult<CompositeNode> invokeOn(final RpcService rpcService, final CompositeNode domInput) throws Exception {
+ public RpcResult<CompositeNode> invokeOn(final RpcService rpcService, final CompositeNode domInput)
+ throws Exception {
return uncheckedInvoke(rpcService, domInput);
}
}
@SuppressWarnings("unchecked")
@Override
- public RpcResult<CompositeNode> uncheckedInvoke(final RpcService rpcService, final CompositeNode domInput) throws Exception {
+ public RpcResult<CompositeNode> uncheckedInvoke(final RpcService rpcService, final CompositeNode domInput)
+ throws Exception {
DataContainer bindingInput = mappingService.dataObjectFromDataDom(inputClass.get(), domInput);
Future<RpcResult<?>> futureResult = (Future<RpcResult<?>>) targetMethod.invoke(rpcService, bindingInput);
if (futureResult == null) {
RpcResult<?> bindingResult = futureResult.get();
final Object resultObj = bindingResult.getResult();
if (resultObj instanceof DataObject) {
- final CompositeNode output = mappingService.toDataDom((DataObject)resultObj);
- return Rpcs.getRpcResult(true, output, Collections.<RpcError>emptySet());
+ final CompositeNode output = mappingService.toDataDom((DataObject) resultObj);
+ return Rpcs.getRpcResult(true, output, Collections.<RpcError> emptySet());
+ }
+ return Rpcs.getRpcResult(true);
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<?>> forwardToDomBroker(final DataObject input) {
+ if (biRpcRegistry == null) {
+ return Futures.<RpcResult<?>> immediateFuture(Rpcs.getRpcResult(false));
+ }
+
+ CompositeNode xml = mappingService.toDataDom(input);
+ CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc, ImmutableList.<Node<?>> of(xml));
+
+ return Futures.transform(biRpcRegistry.invokeRpc(rpc, wrappedXml),
+ new Function<RpcResult<CompositeNode>, RpcResult<?>>() {
+ @Override
+ public RpcResult<?> apply(final RpcResult<CompositeNode> input) {
+ Object baResultValue = null;
+ if (input.getResult() != null) {
+ baResultValue = mappingService.dataObjectFromDataDom(outputClass.get(),
+ input.getResult());
+ }
+ return Rpcs.getRpcResult(input.isSuccessful(), baResultValue, input.getErrors());
+ }
+ });
+ }
+ }
+
+ private class NoInputInvocationStrategy extends RpcInvocationStrategy {
+
+ @SuppressWarnings("rawtypes")
+ private final WeakReference<Class> outputClass;
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public NoInputInvocationStrategy(final QName rpc, final Method targetMethod, final Class<?> outputClass) {
+ super(rpc, targetMethod);
+ this.outputClass = new WeakReference(outputClass);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public RpcResult<CompositeNode> uncheckedInvoke(final RpcService rpcService, final CompositeNode domInput)
+ throws Exception {
+ Future<RpcResult<?>> futureResult = (Future<RpcResult<?>>) targetMethod.invoke(rpcService);
+ if (futureResult == null) {
+ return Rpcs.getRpcResult(false);
+ }
+ RpcResult<?> bindingResult = futureResult.get();
+ final Object resultObj = bindingResult.getResult();
+ if (resultObj instanceof DataObject) {
+ final CompositeNode output = mappingService.toDataDom((DataObject) resultObj);
+ return Rpcs.getRpcResult(true, output, Collections.<RpcError> emptySet());
}
return Rpcs.getRpcResult(true);
}
@Override
public Future<RpcResult<?>> forwardToDomBroker(final DataObject input) {
- if(biRpcRegistry != null) {
+ if (biRpcRegistry != null) {
CompositeNode xml = mappingService.toDataDom(input);
CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc, ImmutableList.<Node<?>> of(xml));
- RpcResult<CompositeNode> result = biRpcRegistry.invokeRpc(rpc, wrappedXml);
- Object baResultValue = null;
- if (result.getResult() != null) {
- baResultValue = mappingService.dataObjectFromDataDom(outputClass.get(), result.getResult());
- }
- RpcResult<?> baResult = Rpcs.getRpcResult(result.isSuccessful(), baResultValue, result.getErrors());
- return Futures.<RpcResult<?>> immediateFuture(baResult);
+ return Futures.transform(biRpcRegistry.invokeRpc(rpc, wrappedXml),
+ new Function<RpcResult<CompositeNode>, RpcResult<?>>() {
+ @Override
+ public RpcResult<?> apply(final RpcResult<CompositeNode> input) {
+ Object baResultValue = null;
+ if (input.getResult() != null) {
+ baResultValue = mappingService.dataObjectFromDataDom(outputClass.get(),
+ input.getResult());
+ }
+ return Rpcs.getRpcResult(input.isSuccessful(), baResultValue, input.getErrors());
+ }
+ });
+ } else {
+ return Futures.<RpcResult<?>> immediateFuture(Rpcs.getRpcResult(false));
}
- return Futures.<RpcResult<?>> immediateFuture(Rpcs.getRpcResult(false));
}
-
}
private class NoInputNoOutputInvocationStrategy extends RpcInvocationStrategy {
}
@Override
- public RpcResult<CompositeNode> uncheckedInvoke(final RpcService rpcService, final CompositeNode domInput) throws Exception {
+ public RpcResult<CompositeNode> uncheckedInvoke(final RpcService rpcService, final CompositeNode domInput)
+ throws Exception {
@SuppressWarnings("unchecked")
Future<RpcResult<Void>> result = (Future<RpcResult<Void>>) targetMethod.invoke(rpcService);
RpcResult<Void> bindingResult = result.get();
private class NoOutputInvocationStrategy extends RpcInvocationStrategy {
-
@SuppressWarnings("rawtypes")
private final WeakReference<Class> inputClass;
@SuppressWarnings({ "rawtypes", "unchecked" })
public NoOutputInvocationStrategy(final QName rpc, final Method targetMethod,
final Class<? extends DataContainer> inputClass) {
- super(rpc,targetMethod);
+ super(rpc, targetMethod);
this.inputClass = new WeakReference(inputClass);
}
-
@Override
- public RpcResult<CompositeNode> uncheckedInvoke(final RpcService rpcService, final CompositeNode domInput) throws Exception {
+ public RpcResult<CompositeNode> uncheckedInvoke(final RpcService rpcService, final CompositeNode domInput)
+ throws Exception {
DataContainer bindingInput = mappingService.dataObjectFromDataDom(inputClass.get(), domInput);
Future<RpcResult<?>> result = (Future<RpcResult<?>>) targetMethod.invoke(rpcService, bindingInput);
if (result == null) {
}
@Override
- public Future<RpcResult<?>> forwardToDomBroker(final DataObject input) {
- if(biRpcRegistry != null) {
- CompositeNode xml = mappingService.toDataDom(input);
- CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc,ImmutableList.<Node<?>>of(xml));
- RpcResult<CompositeNode> result = biRpcRegistry.invokeRpc(rpc, wrappedXml);
- Object baResultValue = null;
- RpcResult<?> baResult = Rpcs.<Void>getRpcResult(result.isSuccessful(), null, result.getErrors());
- return Futures.<RpcResult<?>>immediateFuture(baResult);
+ public ListenableFuture<RpcResult<?>> forwardToDomBroker(final DataObject input) {
+ if (biRpcRegistry == null) {
+ return Futures.<RpcResult<?>> immediateFuture(Rpcs.getRpcResult(false));
}
- return Futures.<RpcResult<?>>immediateFuture(Rpcs.getRpcResult(false));
- }
+ CompositeNode xml = mappingService.toDataDom(input);
+ CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc, ImmutableList.<Node<?>> of(xml));
+
+ return Futures.transform(biRpcRegistry.invokeRpc(rpc, wrappedXml),
+ new Function<RpcResult<CompositeNode>, RpcResult<?>>() {
+ @Override
+ public RpcResult<?> apply(final RpcResult<CompositeNode> input) {
+ return Rpcs.<Void> getRpcResult(input.isSuccessful(), null, input.getErrors());
+ }
+ });
+ }
}
public boolean isRpcForwarding() {