+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
package org.opendaylight.controller.sal.binding.impl.connect.dom;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
import java.lang.ref.WeakReference;
+import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
+import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
import org.opendaylight.controller.sal.binding.api.data.RuntimeDataProvider;
+import org.opendaylight.controller.sal.binding.api.rpc.RpcContextIdentifier;
import org.opendaylight.controller.sal.binding.api.rpc.RpcRouter;
+import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
+import org.opendaylight.yangtools.yang.data.impl.codec.DeserializationException;
import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl;
-import org.opendaylight.controller.sal.binding.api.rpc.RpcContextIdentifier;
+import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.GlobalRpcRegistrationListener;
+import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.RouterInstantiationListener;
import org.opendaylight.controller.sal.common.util.CommitHandlerTransactions;
import org.opendaylight.controller.sal.common.util.Rpcs;
-import org.opendaylight.controller.sal.core.api.Provider;
import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
+import org.opendaylight.controller.sal.core.api.Provider;
import org.opendaylight.controller.sal.core.api.RpcImplementation;
import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.Node;
+import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
-
-import static com.google.common.base.Preconditions.*;
+import com.google.common.collect.ImmutableSet.Builder;
+import com.google.common.util.concurrent.Futures;
public class BindingIndependentConnector implements //
RuntimeDataProvider, //
private final Logger LOG = LoggerFactory.getLogger(BindingIndependentConnector.class);
+ @SuppressWarnings( "deprecation")
private static final InstanceIdentifier<? extends DataObject> ROOT = InstanceIdentifier.builder().toInstance();
private static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier ROOT_BI = org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
.builder().toInstance();
+ private final static Method EQUALS_METHOD;
+
+
private BindingIndependentMappingService mappingService;
private org.opendaylight.controller.sal.core.api.data.DataProviderService biDataService;
private boolean notificationForwarding = false;
+ private RpcProviderRegistryImpl baRpcRegistryImpl;
+
+ private org.opendaylight.controller.sal.dom.broker.spi.RpcRouter biRouter;
+
+
+ static {
+ try {
+ EQUALS_METHOD = Object.class.getMethod("equals", Object.class);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@Override
public DataObject readOperationalData(InstanceIdentifier<? extends DataObject> path) {
try {
org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath = mappingService.toDataDom(path);
CompositeNode result = biDataService.readOperationalData(biPath);
- return potentialAugmentationRead(path,biPath,result);
+ return potentialAugmentationRead(path, biPath, result);
} catch (DeserializationException e) {
throw new IllegalStateException(e);
}
}
- private DataObject potentialAugmentationRead(InstanceIdentifier<? extends DataObject> path, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath, CompositeNode result) throws DeserializationException {
+ private DataObject potentialAugmentationRead(InstanceIdentifier<? extends DataObject> path,
+ org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath, CompositeNode result)
+ throws DeserializationException {
Class<? extends DataObject> targetType = path.getTargetType();
if (Augmentation.class.isAssignableFrom(targetType)) {
path = mappingService.fromDataDom(biPath);
try {
org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath = mappingService.toDataDom(path);
CompositeNode result = biDataService.readConfigurationData(biPath);
- return potentialAugmentationRead(path,biPath,result);
+ return potentialAugmentationRead(path, biPath, result);
} catch (DeserializationException e) {
throw new IllegalStateException(e);
}
private DataModificationTransaction createBindingToDomTransaction(
DataModification<InstanceIdentifier<? extends DataObject>, DataObject> source) {
DataModificationTransaction target = biDataService.beginTransaction();
+ LOG.debug("Created DOM Transaction {} for {},", target.getIdentifier(),source.getIdentifier());
for (Entry<InstanceIdentifier<? extends DataObject>, DataObject> entry : source.getUpdatedConfigurationData()
.entrySet()) {
Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> biEntry = mappingService
.toDataDom(entry);
target.putConfigurationData(biEntry.getKey(), biEntry.getValue());
+ LOG.debug("Update of Binding Configuration Data {} is translated to {}",entry,biEntry);
}
for (Entry<InstanceIdentifier<? extends DataObject>, DataObject> entry : source.getUpdatedOperationalData()
.entrySet()) {
Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> biEntry = mappingService
.toDataDom(entry);
target.putOperationalData(biEntry.getKey(), biEntry.getValue());
+ LOG.debug("Update of Binding Operational Data {} is translated to {}",entry,biEntry);
}
for (InstanceIdentifier<? extends DataObject> entry : source.getRemovedConfigurationData()) {
org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biEntry = mappingService.toDataDom(entry);
target.removeConfigurationData(biEntry);
+ LOG.debug("Delete of Binding Configuration Data {} is translated to {}",entry,biEntry);
}
for (InstanceIdentifier<? extends DataObject> entry : source.getRemovedOperationalData()) {
org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biEntry = mappingService.toDataDom(entry);
target.removeOperationalData(biEntry);
+ LOG.debug("Delete of Binding Operational Data {} is translated to {}",entry,biEntry);
}
return target;
}
baDataService.registerCommitHandlerListener(domToBindingCommitHandler);
dataForwarding = true;
}
-
+
public void startRpcForwarding() {
if (baRpcRegistry != null && biRpcRegistry != null && baRpcRegistry instanceof RouteChangePublisher<?, ?>) {
- checkState(!rpcForwarding,"Connector is already forwarding RPCs");
+ checkState(!rpcForwarding, "Connector is already forwarding RPCs");
domToBindingRpcManager = baRpcRegistry.registerRouteChangeListener(new DomToBindingRpcForwardingManager());
+ if (baRpcRegistry instanceof RpcProviderRegistryImpl) {
+ baRpcRegistryImpl = (RpcProviderRegistryImpl) baRpcRegistry;
+ baRpcRegistryImpl.registerRouterInstantiationListener(domToBindingRpcManager.getInstance());
+ baRpcRegistryImpl.registerGlobalRpcRegistrationListener(domToBindingRpcManager.getInstance());
+ }
+ if(biRpcRegistry instanceof org.opendaylight.controller.sal.dom.broker.spi.RpcRouter) {
+ biRouter = (org.opendaylight.controller.sal.dom.broker.spi.RpcRouter) biRpcRegistry;
+ }
rpcForwarding = true;
}
}
-
+
public void startNotificationForwarding() {
checkState(!notificationForwarding, "Connector is already forwarding notifications.");
notificationForwarding = true;
public void onSessionInitiated(ProviderSession session) {
setDomDataService(session.getService(org.opendaylight.controller.sal.core.api.data.DataProviderService.class));
setDomRpcRegistry(session.getService(RpcProvisionRegistry.class));
-
+
}
public <T extends RpcService> void onRpcRouterCreated(Class<T> serviceType, RpcRouter<T> router) {
}
DataModificationTransaction domTransaction = createBindingToDomTransaction(bindingTransaction);
BindingToDomTransaction wrapped = new BindingToDomTransaction(domTransaction, bindingTransaction);
- LOG.info("Forwarding Binding Transaction: {} as DOM Transaction: {} .", bindingTransaction.getIdentifier(),
+ LOG.trace("Forwarding Binding Transaction: {} as DOM Transaction: {} .", bindingTransaction.getIdentifier(),
domTransaction.getIdentifier());
return wrapped;
}
}
private class DomToBindingCommitHandler implements //
- RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<?>, DataObject>>, //
+ RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject>>, //
DataCommitHandler<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> {
@Override
- public void onRegister(DataCommitHandlerRegistration<InstanceIdentifier<?>, DataObject> registration) {
+ public void onRegister(DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject> registration) {
org.opendaylight.yangtools.yang.data.api.InstanceIdentifier domPath = mappingService.toDataDom(registration
.getPath());
}
@Override
- public void onUnregister(DataCommitHandlerRegistration<InstanceIdentifier<?>, DataObject> registration) {
+ public void onUnregister(DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject> registration) {
// NOOP for now
// FIXME: do registration based on only active commit handlers.
}
org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction baTransaction = createDomToBindingTransaction(domTransaction);
DomToBindingTransaction forwardedTransaction = new DomToBindingTransaction(baTransaction, domTransaction);
- LOG.info("Forwarding DOM Transaction: {} as Binding Transaction: {}.", domTransaction.getIdentifier(),
+ LOG.trace("Forwarding DOM Transaction: {} as Binding Transaction: {}.", domTransaction.getIdentifier(),
baTransaction.getIdentifier());
return forwardedTransaction;
}
}
+ /**
+ * Manager responsible for instantiating forwarders responsible for
+ * forwarding of RPC invocations from DOM Broker to Binding Aware Broker
+ *
+ */
private class DomToBindingRpcForwardingManager implements
- RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>> {
+ RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>,
+ RouterInstantiationListener,
+ GlobalRpcRegistrationListener {
private final Map<Class<? extends RpcService>, DomToBindingRpcForwarder> forwarders = new WeakHashMap<>();
+ private RpcProviderRegistryImpl registryImpl;
+
+ public RpcProviderRegistryImpl getRegistryImpl() {
+ return registryImpl;
+ }
+
+ public void setRegistryImpl(RpcProviderRegistryImpl registryImpl) {
+ this.registryImpl = registryImpl;
+ }
+
+ @Override
+ public void onGlobalRpcRegistered(Class<? extends RpcService> cls) {
+ getRpcForwarder(cls, null);
+ }
+
+ @Override
+ public void onGlobalRpcUnregistered(Class<? extends RpcService> cls) {
+ // NOOP
+ }
+
+ @Override
+ public void onRpcRouterCreated(RpcRouter<?> router) {
+ Class<? extends BaseIdentity> ctx = router.getContexts().iterator().next();
+ getRpcForwarder(router.getServiceType(), ctx);
+ }
@Override
public void onRouteChange(RouteChange<RpcContextIdentifier, InstanceIdentifier<?>> change) {
} else {
potential = new DomToBindingRpcForwarder(service, context);
}
+
forwarders.put(service, potential);
return potential;
}
}
- private class DomToBindingRpcForwarder implements RpcImplementation {
+ private class DomToBindingRpcForwarder implements RpcImplementation, InvocationHandler {
private final Set<QName> supportedRpcs;
private final WeakReference<Class<? extends RpcService>> rpcServiceType;
private Set<org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration> registrations;
+ private Map<QName, RpcInvocationStrategy> strategiesByQName = new HashMap<>();
+ private WeakHashMap<Method, RpcInvocationStrategy> strategiesByMethod = new WeakHashMap<>();
public DomToBindingRpcForwarder(Class<? extends RpcService> service) {
this.rpcServiceType = new WeakReference<Class<? extends RpcService>>(service);
this.supportedRpcs = mappingService.getRpcQNamesFor(service);
- for (QName rpc : supportedRpcs) {
- biRpcRegistry.addRpcImplementation(rpc, this);
+ try {
+ for (QName rpc : supportedRpcs) {
+ RpcInvocationStrategy strategy = createInvocationStrategy(rpc, service);
+ strategiesByMethod.put(strategy.targetMethod, strategy);
+ strategiesByQName.put(rpc, strategy);
+ biRpcRegistry.addRpcImplementation(rpc, this);
+ }
+
+ } catch (Exception e) {
+ LOG.error("Could not forward Rpcs of type {}", service.getName());
}
registrations = ImmutableSet.of();
}
+ /**
+ * Constructor for Routed RPC Forwareder.
+ *
+ * @param service
+ * @param context
+ */
public DomToBindingRpcForwarder(Class<? extends RpcService> service, Class<? extends BaseIdentity> context) {
this.rpcServiceType = new WeakReference<Class<? extends RpcService>>(service);
this.supportedRpcs = mappingService.getRpcQNamesFor(service);
- registrations = new HashSet<>();
- for (QName rpc : supportedRpcs) {
- registrations.add(biRpcRegistry.addRoutedRpcImplementation(rpc, this));
+ Builder<RoutedRpcRegistration> registrationsBuilder = ImmutableSet
+ .<org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration> builder();
+ try {
+ for (QName rpc : supportedRpcs) {
+ RpcInvocationStrategy strategy = createInvocationStrategy(rpc, service);
+ strategiesByMethod.put(strategy.targetMethod, strategy);
+ strategiesByQName.put(rpc, strategy);
+ registrationsBuilder.add(biRpcRegistry.addRoutedRpcImplementation(rpc, this));
+ }
+ createDefaultDomForwarder();
+ } catch (Exception e) {
+ LOG.error("Could not forward Rpcs of type {}", service.getName(),e);
}
- registrations = ImmutableSet.copyOf(registrations);
+ registrations = registrationsBuilder.build();
}
public void registerPaths(Class<? extends BaseIdentity> context, Class<? extends RpcService> service,
}
}
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ if(EQUALS_METHOD.equals(method)) {
+ return false;
+ }
+ RpcInvocationStrategy strategy = strategiesByMethod.get(method);
+ checkState(strategy != null);
+ checkArgument(args.length <= 2);
+ if(args.length == 1) {
+ checkArgument(args[0] instanceof DataObject);
+ return strategy.forwardToDomBroker((DataObject) args[0]);
+ }
+ return strategy.forwardToDomBroker(null);
+ }
+
public void removePaths(Class<? extends BaseIdentity> context, Class<? extends RpcService> service,
Set<InstanceIdentifier<?>> set) {
QName ctx = BindingReflections.findQName(context);
return supportedRpcs;
}
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public void createDefaultDomForwarder() {
+ if (baRpcRegistryImpl != null) {
+ Class<?> cls = rpcServiceType.get();
+ ClassLoader clsLoader = cls.getClassLoader();
+ RpcService proxy = (RpcService) Proxy.newProxyInstance(clsLoader, new Class<?>[] { cls }, this);
+
+ RpcRouter rpcRouter = baRpcRegistryImpl.getRpcRouter(rpcServiceType.get());
+ rpcRouter.registerDefaultService(proxy);
+ }
+ }
+
@Override
public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode domInput) {
checkArgument(rpc != null);
checkState(rpcService != null);
CompositeNode domUnwrappedInput = domInput.getFirstCompositeByName(QName.create(rpc, "input"));
try {
- return resolveInvocationStrategy(rpc, rpcType).invokeOn(rpcService, domUnwrappedInput);
+ return resolveInvocationStrategy(rpc).invokeOn(rpcService, domUnwrappedInput);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
- private RpcInvocationStrategy resolveInvocationStrategy(final QName rpc,
+ private RpcInvocationStrategy resolveInvocationStrategy(QName rpc) {
+ return strategiesByQName.get(rpc);
+ }
+
+ private RpcInvocationStrategy createInvocationStrategy(final QName rpc,
final Class<? extends RpcService> rpcType) throws Exception {
return ClassLoaderUtils.withClassLoader(rpcType.getClassLoader(), new Callable<RpcInvocationStrategy>() {
@Override
RpcInvocationStrategy strategy = null;
if (outputClass.isPresent()) {
if (inputClass.isPresent()) {
- strategy = new DefaultInvocationStrategy(targetMethod, outputClass.get(), inputClass.get());
+ strategy = new DefaultInvocationStrategy(rpc,targetMethod, outputClass.get(), inputClass.get());
} else {
- strategy = new NoInputNoOutputInvocationStrategy(targetMethod);
+ strategy = new NoInputNoOutputInvocationStrategy(rpc,targetMethod);
}
} else {
strategy = null;
private abstract class RpcInvocationStrategy {
protected final Method targetMethod;
+ protected final QName rpc;
- public RpcInvocationStrategy(Method targetMethod) {
+ public RpcInvocationStrategy(QName rpc,Method targetMethod) {
this.targetMethod = targetMethod;
+ this.rpc = rpc;
}
+ public abstract Future<RpcResult<?>> forwardToDomBroker(DataObject input);
+
public abstract RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput)
throws Exception;
private WeakReference<Class> outputClass;
@SuppressWarnings({ "rawtypes", "unchecked" })
- public DefaultInvocationStrategy(Method targetMethod, Class<?> outputClass,
+ public DefaultInvocationStrategy(QName rpc, Method targetMethod, Class<?> outputClass,
Class<? extends DataContainer> inputClass) {
- super(targetMethod);
+ super(rpc,targetMethod);
this.outputClass = new WeakReference(outputClass);
this.inputClass = new WeakReference(inputClass);
}
return Rpcs.getRpcResult(true);
}
+ @Override
+ public Future<RpcResult<?>> forwardToDomBroker(DataObject input) {
+ if(biRouter != null) {
+ CompositeNode xml = mappingService.toDataDom(input);
+ CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc,ImmutableList.<Node<?>>of(xml));
+ RpcResult<CompositeNode> result = biRouter.invokeRpc(rpc, wrappedXml);
+ Object baResultValue = null;
+ if(result.getResult() != null) {
+ baResultValue = mappingService.dataObjectFromDataDom(outputClass.get(), result.getResult());
+ }
+ RpcResult<?> baResult = Rpcs.getRpcResult(result.isSuccessful(), baResultValue, result.getErrors());
+ return Futures.<RpcResult<?>>immediateFuture(baResult);
+ }
+ return Futures.<RpcResult<?>>immediateFuture(Rpcs.getRpcResult(false));
+ }
+
}
private class NoInputNoOutputInvocationStrategy extends RpcInvocationStrategy {
- public NoInputNoOutputInvocationStrategy(Method targetMethod) {
- super(targetMethod);
+ public NoInputNoOutputInvocationStrategy(QName rpc, Method targetMethod) {
+ super(rpc,targetMethod);
}
public RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput) throws Exception {
RpcResult<Void> bindingResult = result.get();
return Rpcs.getRpcResult(bindingResult.isSuccessful(), bindingResult.getErrors());
}
+
+ @Override
+ public Future<RpcResult<?>> forwardToDomBroker(DataObject input) {
+ return Futures.immediateFuture(null);
+ }
}
public boolean isRpcForwarding() {