import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
import org.opendaylight.controller.md.sal.common.api.routing.RouteChangePublisher;
+import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.controller.sal.binding.api.NotificationProviderService.NotificationInterestListener;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
import org.opendaylight.controller.sal.binding.api.data.RuntimeDataProvider;
import org.opendaylight.controller.sal.core.api.RpcImplementation;
import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction;
+import org.opendaylight.controller.sal.core.api.notify.NotificationListener;
+import org.opendaylight.controller.sal.core.api.notify.NotificationPublishService;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.concepts.util.ClassLoaderUtils;
import org.opendaylight.yangtools.yang.binding.DataContainer;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.Notification;
import org.opendaylight.yangtools.yang.binding.RpcService;
import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
import org.opendaylight.yangtools.yang.data.api.Node;
import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
+import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
+import org.opendaylight.yangtools.yang.data.impl.codec.DeserializationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Provider, //
AutoCloseable {
+
+
private final Logger LOG = LoggerFactory.getLogger(BindingIndependentConnector.class);
- @SuppressWarnings( "deprecation")
+ @SuppressWarnings("deprecation")
private static final InstanceIdentifier<? extends DataObject> ROOT = InstanceIdentifier.builder().toInstance();
private static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier ROOT_BI = org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
.builder().toInstance();
private final static Method EQUALS_METHOD;
-
-
+
private BindingIndependentMappingService mappingService;
private org.opendaylight.controller.sal.core.api.data.DataProviderService biDataService;
private DataProviderService baDataService;
- private ConcurrentMap<Object, BindingToDomTransaction> domOpenedTransactions = new ConcurrentHashMap<>();
- private ConcurrentMap<Object, DomToBindingTransaction> bindingOpenedTransactions = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Object, BindingToDomTransaction> domOpenedTransactions = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Object, DomToBindingTransaction> bindingOpenedTransactions = new ConcurrentHashMap<>();
- private BindingToDomCommitHandler bindingToDomCommitHandler = new BindingToDomCommitHandler();
- private DomToBindingCommitHandler domToBindingCommitHandler = new DomToBindingCommitHandler();
+ private final BindingToDomCommitHandler bindingToDomCommitHandler = new BindingToDomCommitHandler();
+ private final DomToBindingCommitHandler domToBindingCommitHandler = new DomToBindingCommitHandler();
private Registration<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> baCommitHandlerRegistration;
// private ListenerRegistration<BindingToDomRpcForwardingManager>
// bindingToDomRpcManager;
- private Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier> toDOMInstanceIdentifier = new Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier>() {
+ private final Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier> toDOMInstanceIdentifier = new Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier>() {
@Override
public org.opendaylight.yangtools.yang.data.api.InstanceIdentifier apply(InstanceIdentifier<?> input) {
private RpcProviderRegistryImpl baRpcRegistryImpl;
private org.opendaylight.controller.sal.dom.broker.spi.RpcRouter biRouter;
-
-
+
+ private NotificationProviderService baNotifyService;
+
+ private NotificationPublishService domNotificationService;
+
static {
try {
- EQUALS_METHOD = Object.class.getMethod("equals", Object.class);
+ EQUALS_METHOD = Object.class.getMethod("equals", Object.class);
} catch (Exception e) {
throw new RuntimeException(e);
}
baRpcRegistryImpl.registerRouterInstantiationListener(domToBindingRpcManager.getInstance());
baRpcRegistryImpl.registerGlobalRpcRegistrationListener(domToBindingRpcManager.getInstance());
}
- if(biRpcRegistry instanceof org.opendaylight.controller.sal.dom.broker.spi.RpcRouter) {
+ if (biRpcRegistry instanceof org.opendaylight.controller.sal.dom.broker.spi.RpcRouter) {
biRouter = (org.opendaylight.controller.sal.dom.broker.spi.RpcRouter) biRpcRegistry;
}
rpcForwarding = true;
public void startNotificationForwarding() {
checkState(!notificationForwarding, "Connector is already forwarding notifications.");
- notificationForwarding = true;
+ if (baNotifyService != null && domNotificationService != null) {
+ baNotifyService.registerInterestListener(new DomToBindingNotificationForwarder());
+
+ notificationForwarding = true;
+ }
}
protected void setMappingService(BindingIndependentMappingService mappingService) {
private class BindingToDomTransaction implements
DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> {
- private DataModificationTransaction backing;
- private DataModification<InstanceIdentifier<? extends DataObject>, DataObject> modification;
+ private final DataModificationTransaction backing;
+ private final DataModification<InstanceIdentifier<? extends DataObject>, DataObject> modification;
public BindingToDomTransaction(DataModificationTransaction backing,
DataModification<InstanceIdentifier<? extends DataObject>, DataObject> modification) {
}
private class DomToBindingCommitHandler implements //
- RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<?>, DataObject>>, //
+ RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject>>, //
DataCommitHandler<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> {
@Override
- public void onRegister(DataCommitHandlerRegistration<InstanceIdentifier<?>, DataObject> registration) {
+ public void onRegister(DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject> registration) {
org.opendaylight.yangtools.yang.data.api.InstanceIdentifier domPath = mappingService.toDataDom(registration
.getPath());
}
@Override
- public void onUnregister(DataCommitHandlerRegistration<InstanceIdentifier<?>, DataObject> registration) {
+ public void onUnregister(DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject> registration) {
// NOOP for now
// FIXME: do registration based on only active commit handlers.
}
+ @Override
public org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> requestCommit(
DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> domTransaction) {
Object identifier = domTransaction.getIdentifier();
/**
* Manager responsible for instantiating forwarders responsible for
* forwarding of RPC invocations from DOM Broker to Binding Aware Broker
- *
+ *
*/
private class DomToBindingRpcForwardingManager implements
RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>,
public void setRegistryImpl(RpcProviderRegistryImpl registryImpl) {
this.registryImpl = registryImpl;
}
-
+
@Override
public void onGlobalRpcRegistered(Class<? extends RpcService> cls) {
getRpcForwarder(cls, null);
}
-
+
@Override
public void onGlobalRpcUnregistered(Class<? extends RpcService> cls) {
// NOOP
}
-
+
@Override
public void onRpcRouterCreated(RpcRouter<?> router) {
Class<? extends BaseIdentity> ctx = router.getContexts().iterator().next();
private final Set<QName> supportedRpcs;
private final WeakReference<Class<? extends RpcService>> rpcServiceType;
- private Set<org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration> registrations;
- private Map<QName, RpcInvocationStrategy> strategiesByQName = new HashMap<>();
- private WeakHashMap<Method, RpcInvocationStrategy> strategiesByMethod = new WeakHashMap<>();
+ private final Set<org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration> registrations;
+ private final Map<QName, RpcInvocationStrategy> strategiesByQName = new HashMap<>();
+ private final WeakHashMap<Method, RpcInvocationStrategy> strategiesByMethod = new WeakHashMap<>();
public DomToBindingRpcForwarder(Class<? extends RpcService> service) {
this.rpcServiceType = new WeakReference<Class<? extends RpcService>>(service);
}
} catch (Exception e) {
- LOG.error("Could not forward Rpcs of type {}", service.getName());
+ LOG.error("Could not forward Rpcs of type {}", service.getName(),e);
}
registrations = ImmutableSet.of();
}
/**
* Constructor for Routed RPC Forwareder.
- *
+ *
* @param service
* @param context
*/
}
createDefaultDomForwarder();
} catch (Exception e) {
- LOG.error("Could not forward Rpcs of type {}", service.getName(),e);
+ LOG.error("Could not forward Rpcs of type {}", service.getName(), e);
}
registrations = registrationsBuilder.build();
}
}
}
-
+
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- if(EQUALS_METHOD.equals(method)) {
+ if (EQUALS_METHOD.equals(method)) {
return false;
}
RpcInvocationStrategy strategy = strategiesByMethod.get(method);
checkState(strategy != null);
checkArgument(args.length <= 2);
- if(args.length == 1) {
+ if (args.length == 1) {
checkArgument(args[0] instanceof DataObject);
return strategy.forwardToDomBroker((DataObject) args[0]);
}
Class<?> cls = rpcServiceType.get();
ClassLoader clsLoader = cls.getClassLoader();
RpcService proxy = (RpcService) Proxy.newProxyInstance(clsLoader, new Class<?>[] { cls }, this);
-
+
RpcRouter rpcRouter = baRpcRegistryImpl.getRpcRouter(rpcServiceType.get());
rpcRouter.registerDefaultService(proxy);
}
RpcInvocationStrategy strategy = null;
if (outputClass.isPresent()) {
if (inputClass.isPresent()) {
- strategy = new DefaultInvocationStrategy(rpc,targetMethod, outputClass.get(), inputClass.get());
+ strategy = new DefaultInvocationStrategy(rpc, targetMethod, outputClass.get(), inputClass
+ .get());
} else {
- strategy = new NoInputNoOutputInvocationStrategy(rpc,targetMethod);
+ strategy = new NoInputNoOutputInvocationStrategy(rpc, targetMethod);
}
+ } else if(inputClass.isPresent()){
+ strategy = new NoOutputInvocationStrategy(rpc,targetMethod, inputClass.get());
} else {
- strategy = null;
+ strategy = new NoInputNoOutputInvocationStrategy(rpc,targetMethod);
}
return strategy;
}
protected final Method targetMethod;
protected final QName rpc;
- public RpcInvocationStrategy(QName rpc,Method targetMethod) {
+ public RpcInvocationStrategy(QName rpc, Method targetMethod) {
this.targetMethod = targetMethod;
this.rpc = rpc;
}
private class DefaultInvocationStrategy extends RpcInvocationStrategy {
@SuppressWarnings("rawtypes")
- private WeakReference<Class> inputClass;
+ private final WeakReference<Class> inputClass;
@SuppressWarnings("rawtypes")
- private WeakReference<Class> outputClass;
+ private final WeakReference<Class> outputClass;
@SuppressWarnings({ "rawtypes", "unchecked" })
public DefaultInvocationStrategy(QName rpc, Method targetMethod, Class<?> outputClass,
Class<? extends DataContainer> inputClass) {
- super(rpc,targetMethod);
+ super(rpc, targetMethod);
this.outputClass = new WeakReference(outputClass);
this.inputClass = new WeakReference(inputClass);
}
+ @SuppressWarnings("unchecked")
@Override
public RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput) throws Exception {
DataContainer bindingInput = mappingService.dataObjectFromDataDom(inputClass.get(), domInput);
- Future<RpcResult<?>> result = (Future<RpcResult<?>>) targetMethod.invoke(rpcService, bindingInput);
- if (result == null) {
+ Future<RpcResult<?>> futureResult = (Future<RpcResult<?>>) targetMethod.invoke(rpcService, bindingInput);
+ if (futureResult == null) {
return Rpcs.getRpcResult(false);
}
- RpcResult<?> bindingResult = result.get();
+ RpcResult<?> bindingResult = futureResult.get();
+ final Object resultObj = bindingResult.getResult();
+ if (resultObj instanceof DataObject) {
+ final CompositeNode output = mappingService.toDataDom((DataObject)resultObj);
+ return Rpcs.getRpcResult(true, output, Collections.<RpcError>emptySet());
+ }
return Rpcs.getRpcResult(true);
}
-
+
@Override
public Future<RpcResult<?>> forwardToDomBroker(DataObject input) {
- if(biRouter != null) {
+ if(biRouter != null) {
CompositeNode xml = mappingService.toDataDom(input);
- CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc,ImmutableList.<Node<?>>of(xml));
+ CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc, ImmutableList.<Node<?>> of(xml));
RpcResult<CompositeNode> result = biRouter.invokeRpc(rpc, wrappedXml);
Object baResultValue = null;
- if(result.getResult() != null) {
+ if (result.getResult() != null) {
baResultValue = mappingService.dataObjectFromDataDom(outputClass.get(), result.getResult());
}
RpcResult<?> baResult = Rpcs.getRpcResult(result.isSuccessful(), baResultValue, result.getErrors());
- return Futures.<RpcResult<?>>immediateFuture(baResult);
+ return Futures.<RpcResult<?>> immediateFuture(baResult);
}
- return Futures.<RpcResult<?>>immediateFuture(Rpcs.getRpcResult(false));
+ return Futures.<RpcResult<?>> immediateFuture(Rpcs.getRpcResult(false));
}
}
private class NoInputNoOutputInvocationStrategy extends RpcInvocationStrategy {
public NoInputNoOutputInvocationStrategy(QName rpc, Method targetMethod) {
- super(rpc,targetMethod);
+ super(rpc, targetMethod);
}
+ @Override
public RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput) throws Exception {
@SuppressWarnings("unchecked")
Future<RpcResult<Void>> result = (Future<RpcResult<Void>>) targetMethod.invoke(rpcService);
RpcResult<Void> bindingResult = result.get();
return Rpcs.getRpcResult(bindingResult.isSuccessful(), bindingResult.getErrors());
}
-
+
@Override
public Future<RpcResult<?>> forwardToDomBroker(DataObject input) {
return Futures.immediateFuture(null);
}
}
+ private class NoOutputInvocationStrategy extends RpcInvocationStrategy {
+
+
+ @SuppressWarnings("rawtypes")
+ private final WeakReference<Class> inputClass;
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public NoOutputInvocationStrategy(QName rpc, Method targetMethod,
+ Class<? extends DataContainer> inputClass) {
+ super(rpc,targetMethod);
+ this.inputClass = new WeakReference(inputClass);
+ }
+
+
+ @Override
+ public RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput) throws Exception {
+ DataContainer bindingInput = mappingService.dataObjectFromDataDom(inputClass.get(), domInput);
+ Future<RpcResult<?>> result = (Future<RpcResult<?>>) targetMethod.invoke(rpcService, bindingInput);
+ if (result == null) {
+ return Rpcs.getRpcResult(false);
+ }
+ RpcResult<?> bindingResult = result.get();
+ return Rpcs.getRpcResult(true);
+ }
+
+ @Override
+ public Future<RpcResult<?>> forwardToDomBroker(DataObject input) {
+ if(biRouter != null) {
+ CompositeNode xml = mappingService.toDataDom(input);
+ CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc,ImmutableList.<Node<?>>of(xml));
+ RpcResult<CompositeNode> result = biRouter.invokeRpc(rpc, wrappedXml);
+ Object baResultValue = null;
+ RpcResult<?> baResult = Rpcs.<Void>getRpcResult(result.isSuccessful(), null, result.getErrors());
+ return Futures.<RpcResult<?>>immediateFuture(baResult);
+ }
+ return Futures.<RpcResult<?>>immediateFuture(Rpcs.getRpcResult(false));
+ }
+
+ }
+
public boolean isRpcForwarding() {
return rpcForwarding;
}
}
public boolean isNotificationForwarding() {
- // TODO Auto-generated method stub
return notificationForwarding;
}
public BindingIndependentMappingService getMappingService() {
return mappingService;
}
+
+ public void setBindingNotificationService(NotificationProviderService baService) {
+ this.baNotifyService = baService;
+
+ }
+
+ public void setDomNotificationService(NotificationPublishService domService) {
+ this.domNotificationService = domService;
+ }
+
+ private class DomToBindingNotificationForwarder implements NotificationInterestListener, NotificationListener {
+
+ private final ConcurrentMap<QName, WeakReference<Class<? extends Notification>>> notifications = new ConcurrentHashMap<>();
+ private final Set<QName> supportedNotifications = new HashSet<>();
+
+ @Override
+ public Set<QName> getSupportedNotifications() {
+ return Collections.unmodifiableSet(supportedNotifications);
+ }
+
+ @Override
+ public void onNotification(CompositeNode notification) {
+ QName qname = notification.getNodeType();
+ WeakReference<Class<? extends Notification>> potential = notifications.get(qname);
+ if (potential != null) {
+ Class<? extends Notification> potentialClass = potential.get();
+ if (potentialClass != null) {
+ final DataContainer baNotification = mappingService.dataObjectFromDataDom(potentialClass,
+ notification);
+
+ if (baNotification instanceof Notification) {
+ baNotifyService.publish((Notification) baNotification);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void onNotificationSubscribtion(Class<? extends Notification> notificationType) {
+ QName qname = BindingReflections.findQName(notificationType);
+ if (qname != null) {
+ WeakReference<Class<? extends Notification>> already = notifications.putIfAbsent(qname,
+ new WeakReference<Class<? extends Notification>>(notificationType));
+ if (already == null) {
+ domNotificationService.addNotificationListener(qname, this);
+ supportedNotifications.add(qname);
+ }
+ }
+ }
+ }
}