<feature version='${project.version}'>odl-netconf-connector</feature>
<feature version='${project.version}'>odl-mdsal-broker</feature>
<bundle>mvn:org.opendaylight.controller/messagebus-api/${project.version}</bundle>
+ <bundle>mvn:org.opendaylight.controller/messagebus-spi/${project.version}</bundle>
<bundle>mvn:org.opendaylight.controller/messagebus-impl/${project.version}</bundle>
<configfile finalname="${config.configfile.directory}/05-message-bus.xml">mvn:org.opendaylight.controller/messagebus-config/${project.version}/xml/config</configfile>
</feature>
<artifactId>messagebus-api</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>messagebus-spi</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>messagebus-impl</artifactId>
and is available at http://www.eclipse.org/legal/epl-v10.html
-->
<snapshot>
- <configuration>
+ <configuration>
<data xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
<modules xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
<module>
- <name>messagebus-app</name>
+ <name>messagebus-app-impl</name>
<type xmlns:binding-impl="urn:opendaylight:params:xml:ns:yang:controller:messagebus:app:impl">binding-impl:messagebus-app-impl</type>
<binding-broker xmlns="urn:opendaylight:params:xml:ns:yang:controller:messagebus:app:impl">
<type xmlns:md-sal-binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">md-sal-binding:binding-broker-osgi-registry</type>
</namespace-to-stream>
</module>
</modules>
+ <services xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
+ <service>
+ <type xmlns:mb-esr="urn:opendaylight:params:xml:ns:yang:controller:messagebus:spi:eventsourceregistry">mb-esr:event-source-registry</type>
+ <instance>
+ <name>messagebus-app-impl</name>
+ <provider>/modules/module[type='messagebus-app-impl'][name='messagebus-app-impl']</provider>
+ </instance>
+ </service>
+ </services>
</data>
</configuration>
<required-capabilities>
<capability>urn:opendaylight:params:xml:ns:yang:controller:messagebus:app:impl?module=messagebus-app-impl&revision=2015-02-03</capability>
+ <capability>urn:opendaylight:params:xml:ns:yang:controller:messagebus:spi:eventsourceregistry?module=messagebus-event-source-registry&revision=2015-04-02</capability>
</required-capabilities>
</snapshot>
<artifactId>messagebus-api</artifactId>\r
<version>1.2.0-SNAPSHOT</version>\r
</dependency>\r
+ <dependency>\r
+ <groupId>org.opendaylight.controller</groupId>\r
+ <artifactId>messagebus-spi</artifactId>\r
+ <version>1.2.0-SNAPSHOT</version>\r
+ </dependency>\r
<dependency>\r
<groupId>org.opendaylight.controller</groupId>\r
<artifactId>sal-netconf-connector</artifactId>\r
-/**
+/*
* Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
*/
package org.opendaylight.controller.config.yang.messagebus.app.impl;
-import java.util.List;
+import java.util.HashSet;
+import java.util.Set;
+
import org.opendaylight.controller.config.api.DependencyResolver;
import org.opendaylight.controller.config.api.ModuleIdentifier;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
import org.opendaylight.controller.messagebus.app.impl.EventSourceTopology;
-import org.opendaylight.controller.messagebus.app.impl.NetconfEventSourceManager;
+import org.opendaylight.controller.messagebus.eventsources.netconf.NetconfEventSourceManager;
+import org.opendaylight.controller.messagebus.spi.EventSource;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class MessageBusAppImplModule extends
- org.opendaylight.controller.config.yang.messagebus.app.impl.AbstractMessageBusAppImplModule {
+import com.google.common.base.Preconditions;
+
+public class MessageBusAppImplModule extends org.opendaylight.controller.config.yang.messagebus.app.impl.AbstractMessageBusAppImplModule {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageBusAppImplModule.class);
private BundleContext bundleContext;
@Override
public java.lang.AutoCloseable createInstance() {
- final List<NamespaceToStream> namespaceMapping = getNamespaceToStream();
final ProviderContext bindingCtx = getBindingBrokerDependency().registerProvider(new Providers.BindingAware());
final ProviderSession domCtx = getDomBrokerDependency().registerProvider(new Providers.BindingIndependent());
-
final DataBroker dataBroker = bindingCtx.getSALService(DataBroker.class);
final DOMNotificationPublishService domPublish = domCtx.getService(DOMNotificationPublishService.class);
final DOMMountPointService domMount = domCtx.getService(DOMMountPointService.class);
final MountPointService bindingMount = bindingCtx.getSALService(MountPointService.class);
final RpcProviderRegistry rpcRegistry = bindingCtx.getSALService(RpcProviderRegistry.class);
- final EventSourceTopology eventSourceTopology = new EventSourceTopology(dataBroker, rpcRegistry);
- final NetconfEventSourceManager eventSourceManager = new NetconfEventSourceManager(dataBroker, domPublish,
- domMount, bindingMount, eventSourceTopology, getNamespaceToStream());
-
- final AutoCloseable closer = new AutoCloseable() {
- @Override
- public void close() {
- eventSourceTopology.close();
- eventSourceManager.close();
- }
- };
+ final EventSourceRegistryWrapper eventSourceRegistryWrapper = new EventSourceRegistryWrapper(new EventSourceTopology(dataBroker, rpcRegistry));
+ final NetconfEventSourceManager netconfEventSourceManager = NetconfEventSourceManager.create(dataBroker, domPublish,domMount, bindingMount, eventSourceRegistryWrapper, getNamespaceToStream());
+ eventSourceRegistryWrapper.addAutoCloseable(netconfEventSourceManager);
+ LOGGER.info("Messagebus initialized");
+ return eventSourceRegistryWrapper;
- return closer;
}
- private void closeProvider(final AutoCloseable closable) {
- try {
- closable.close();
- } catch (final Exception e) {
- LOGGER.error("Exception while closing: {}\n Exception: {}", closable, e);
+ //TODO: separate NetconfEventSource into separate bundle, remove this wrapper, return EventSourceTopology directly as EventSourceRegistry
+ private class EventSourceRegistryWrapper implements EventSourceRegistry{
+
+ private final EventSourceRegistry baseEventSourceRegistry;
+ private final Set<AutoCloseable> autoCloseables = new HashSet<>();
+
+ public EventSourceRegistryWrapper(EventSourceRegistry baseEventSourceRegistry) {
+ this.baseEventSourceRegistry = baseEventSourceRegistry;
}
+
+ public void addAutoCloseable(AutoCloseable ac){
+ Preconditions.checkNotNull(ac);
+ autoCloseables.add(ac);
+ }
+
+ @Override
+ public void close() throws Exception {
+ for(AutoCloseable ac : autoCloseables){
+ ac.close();
+ }
+ baseEventSourceRegistry.close();
+ }
+
+ @Override
+ public <T extends EventSource> EventSourceRegistration<T> registerEventSource(T eventSource) {
+ return this.baseEventSourceRegistry.registerEventSource(eventSource);
+ }
+
}
}
-/**
+/*
* Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.app.impl;
+
+import org.opendaylight.controller.messagebus.spi.EventSource;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
+
+import com.google.common.base.Preconditions;
+
+
+class EventSourceRegistrationImpl <T extends EventSource> extends AbstractObjectRegistration<T> implements EventSourceRegistration<T>{
+
+ private final EventSourceTopology eventSourceTopology;
+
+ /**
+ * @param instance of EventSource that has been registered by {@link EventSourceRegistryImpl#registerEventSource(Node, EventSource)}
+ */
+ public EventSourceRegistrationImpl(T instance, EventSourceTopology eventSourceTopology) {
+ super(instance);
+ this.eventSourceTopology = Preconditions.checkNotNull(eventSourceTopology);
+ }
+
+ @Override
+ protected void removeRegistration() {
+ this.eventSourceTopology.unRegister(getInstance());
+ }
+
+}
package org.opendaylight.controller.messagebus.app.impl;
-import com.google.common.base.Preconditions;
import java.util.Map;
import java.util.regex.Pattern;
+
import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
public class EventSourceTopic implements DataChangeListener {
private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(EventSourceTopic.class);
private final NotificationPattern notificationPattern;
final String regex = Util.wildcardToRegex(nodeIdPattern);
this.nodeIdPattern = Pattern.compile(regex);
-
- // FIXME: We need to perform some salting in order to make
- // the topic IDs less predictable.
- this.topicId = new TopicId(Util.md5String(notificationPattern + nodeIdPattern));
+ this.topicId = new TopicId(Util.getUUIDIdent());
}
public TopicId getTopicId() {
}
public void notifyNode(final InstanceIdentifier<?> nodeId) {
+
try {
- sourceService.joinTopic(getJoinTopicInputArgument(nodeId));
+ RpcResult<JoinTopicOutput> rpcResultJoinTopic = sourceService.joinTopic(getJoinTopicInputArgument(nodeId)).get();
+ if(rpcResultJoinTopic.isSuccessful() == false){
+ for(RpcError err : rpcResultJoinTopic.getErrors()){
+ LOG.error("Can not join topic: [{}] on node: [{}]. Error: {}",getTopicId().getValue(),nodeId.toString(),err.toString());
+ }
+ }
} catch (final Exception e) {
LOG.error("Could not invoke join topic for node {}", nodeId);
}
return jti;
}
-
}
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.messagebus.spi.EventSource;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
-public class EventSourceTopology implements EventAggregatorService, AutoCloseable {
+
+public class EventSourceTopology implements EventAggregatorService, EventSourceRegistry {
private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class);
private static final String TOPOLOGY_ID = "EVENT-SOURCE-TOPOLOGY" ;
.child(TopologyTypes.class)
.augmentation(TopologyTypes1.class);
- private final Map<DataChangeListener, ListenerRegistration<DataChangeListener>> registrations =
+ private final Map<DataChangeListener, ListenerRegistration<DataChangeListener>> topicListenerRegistrations =
new ConcurrentHashMap<>();
+ private final Map<NodeKey, RoutedRpcRegistration<EventSourceService>> routedRpcRegistrations =
+ new ConcurrentHashMap<>();;
private final DataBroker dataBroker;
private final RpcRegistration<EventAggregatorService> aggregatorRpcReg;
final TopologyEventSource topologySource = new TopologyEventSourceBuilder().build();
final TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build();
putData(OPERATIONAL, TOPOLOGY_TYPE_PATH, topologyTypeAugment);
-
+ LOG.info("EventSourceRegistry has been initialized");
}
private <T extends DataObject> void putData(final LogicalDatastoreType store,
}
- private void insert(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath, final Node node) {
- final NodeKey nodeKey = node.getKey();
+ private <T extends DataObject> void deleteData(final LogicalDatastoreType store, final InstanceIdentifier<T> path){
+ final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
+ tx.delete(OPERATIONAL, path);
+ tx.submit();
+ }
+
+ private void insert(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath) {
+ final NodeKey nodeKey = sourcePath.getKey();
final InstanceIdentifier<Node1> augmentPath = sourcePath.augmentation(Node1.class);
final Node1 nodeAgument = new Node1Builder().setEventSourceNode(new NodeId(nodeKey.getNodeId().getValue())).build();
putData(OPERATIONAL, augmentPath, nodeAgument);
}
+ private void remove(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath){
+ final InstanceIdentifier<Node1> augmentPath = sourcePath.augmentation(Node1.class);
+ deleteData(OPERATIONAL, augmentPath);
+ }
+
private void notifyExistingNodes(final Pattern nodeIdPatternRegex, final EventSourceTopic eventSourceTopic){
final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();
final NotificationPattern notificationPattern = new NotificationPattern(input.getNotificationPattern());
final String nodeIdPattern = input.getNodeIdPattern().getValue();
final Pattern nodeIdPatternRegex = Pattern.compile(Util.wildcardToRegex(nodeIdPattern));
- final EventSourceTopic eventSourceTopic = new EventSourceTopic(notificationPattern, input.getNodeIdPattern().getValue(), eventSourceService);
+ final EventSourceTopic eventSourceTopic = new EventSourceTopic(notificationPattern, nodeIdPattern, eventSourceService);
registerTopic(eventSourceTopic);
.setTopicId(eventSourceTopic.getTopicId())
.build();
- return Util.resultFor(cto);
+ return Util.resultRpcSuccessFor(cto);
}
@Override
@Override
public void close() {
aggregatorRpcReg.close();
+ for(ListenerRegistration<DataChangeListener> reg : topicListenerRegistrations.values()){
+ reg.close();
+ }
}
- public void registerTopic(final EventSourceTopic listener) {
+ private void registerTopic(final EventSourceTopic listener) {
final ListenerRegistration<DataChangeListener> listenerRegistration = dataBroker.registerDataChangeListener(OPERATIONAL,
EVENT_SOURCE_TOPOLOGY_PATH,
listener,
DataBroker.DataChangeScope.SUBTREE);
- registrations.put(listener, listenerRegistration);
+ topicListenerRegistrations.put(listener, listenerRegistration);
+ }
+
+ public void register(final EventSource eventSource){
+ NodeKey nodeKey = eventSource.getSourceNodeKey();
+ final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey);
+ RoutedRpcRegistration<EventSourceService> reg = rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, eventSource);
+ reg.registerPath(NodeContext.class, sourcePath);
+ routedRpcRegistrations.put(nodeKey,reg);
+ insert(sourcePath);
}
- public void register(final Node node, final NetconfEventSource netconfEventSource) {
- final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey());
- rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, netconfEventSource)
- .registerPath(NodeContext.class, sourcePath);
- insert(sourcePath,node);
- // FIXME: Return registration object.
+ public void unRegister(final EventSource eventSource){
+ final NodeKey nodeKey = eventSource.getSourceNodeKey();
+ final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey);
+ final RoutedRpcRegistration<EventSourceService> removeRegistration = routedRpcRegistrations.remove(nodeKey);
+ if(removeRegistration != null){
+ removeRegistration.close();
+ remove(sourcePath);
+ }
}
+ @Override
+ public <T extends EventSource> EventSourceRegistration<T> registerEventSource(
+ T eventSource) {
+ EventSourceRegistrationImpl<T> esr = new EventSourceRegistrationImpl<>(eventSource, this);
+ register(eventSource);
+ return esr;
+ }
}
+
package org.opendaylight.controller.messagebus.app.impl;
-import java.math.BigInteger;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.Future;
import java.util.regex.Pattern;
import com.google.common.util.concurrent.Futures;
-public final class Util {
- private static final MessageDigest messageDigestTemplate = getDigestInstance();
-
- private static MessageDigest getDigestInstance() {
- try {
- return MessageDigest.getInstance("MD5");
- } catch (final NoSuchAlgorithmException e) {
- throw new RuntimeException("Unable to get MD5 instance");
- }
- }
- static String md5String(final String inputString) {
+public final class Util {
- try {
- final MessageDigest md = (MessageDigest)messageDigestTemplate.clone();
- md.update(inputString.getBytes("UTF-8"), 0, inputString.length());
- return new BigInteger(1, md.digest()).toString(16);
- } catch (final Exception e) {
- throw new RuntimeException("Unable to get MD5 instance");
- }
+ public static String getUUIDIdent(){
+ UUID uuid = UUID.randomUUID();
+ return uuid.toString();
}
- public static <T> Future<RpcResult<T>> resultFor(final T output) {
+ public static <T> Future<RpcResult<T>> resultRpcSuccessFor(final T output) {
final RpcResult<T> result = RpcResultBuilder.success(output).build();
return Futures.immediateFuture(result);
}
* @param wildcard
* @return
*/
- static String wildcardToRegex(final String wildcard){
+ public static String wildcardToRegex(final String wildcard){
final StringBuffer s = new StringBuffer(wildcard.length());
s.append('^');
for (final char c : wildcard.toCharArray()) {
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.messagebus.app.impl;
+package org.opendaylight.controller.messagebus.eventsources.netconf;
+
+import static com.google.common.util.concurrent.Futures.immediateFuture;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Future;
import java.util.regex.Pattern;
import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.messagebus.app.impl.TopicDOMNotification;
+import org.opendaylight.controller.messagebus.app.impl.Util;
+import org.opendaylight.controller.messagebus.spi.EventSource;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
-import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicNotification;
-import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicStatus;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
-public class NetconfEventSource implements EventSourceService, DOMNotificationListener, DataChangeListener {
+public class NetconfEventSource implements EventSource, DOMNotificationListener, DataChangeListener {
private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSource.class);
private static final NodeIdentifier STREAM_QNAME = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"stream"));
private static final SchemaPath CREATE_SUBSCRIPTION = SchemaPath.create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription"));
-
private final String nodeId;
-
+ private final Node node;
private final DOMMountPoint netconfMount;
private final DOMNotificationPublishService domPublish;
- private final NotificationsService notificationRpcService;
-
private final Set<String> activeStreams = new ConcurrentSkipListSet<>();
private final Map<String, String> urnPrefixToStreamMap;
+ private final ConcurrentHashMap<TopicId,ListenerRegistration<NetconfEventSource>> listenerRegistrationMap = new ConcurrentHashMap<>();
-
- public NetconfEventSource(final String nodeId, final Map<String, String> streamMap, final DOMMountPoint netconfMount, final DOMNotificationPublishService publishService, final MountPoint bindingMount) {
+ public NetconfEventSource(final Node node, final Map<String, String> streamMap, final DOMMountPoint netconfMount, final DOMNotificationPublishService publishService, final MountPoint bindingMount) {
this.netconfMount = netconfMount;
- this.notificationRpcService = bindingMount.getService(RpcConsumerRegistry.class).get().getRpcService(NotificationsService.class);
- this.nodeId = nodeId;
+ this.node = node;
+ this.nodeId = node.getNodeId().getValue();
this.urnPrefixToStreamMap = streamMap;
this.domPublish = publishService;
LOG.info("NetconfEventSource [{}] created.", nodeId);
@Override
public Future<RpcResult<JoinTopicOutput>> joinTopic(final JoinTopicInput input) {
final NotificationPattern notificationPattern = input.getNotificationPattern();
-
- // FIXME: default language should already be regex
- final String regex = Util.wildcardToRegex(notificationPattern.getValue());
-
- final Pattern pattern = Pattern.compile(regex);
- final List<SchemaPath> matchingNotifications = Util.expandQname(availableNotifications(), pattern);
- registerNotificationListener(matchingNotifications);
- final JoinTopicOutput output = new JoinTopicOutputBuilder().build();
- return com.google.common.util.concurrent.Futures.immediateFuture(RpcResultBuilder.success(output).build());
+ final List<SchemaPath> matchingNotifications = getMatchingNotifications(notificationPattern);
+ return registerNotificationListener(input.getTopicId(),matchingNotifications);
}
- private List<SchemaPath> availableNotifications() {
- // FIXME: use SchemaContextListener to get changes asynchronously
- final Set<NotificationDefinition> availableNotifications = netconfMount.getSchemaContext().getNotifications();
- final List<SchemaPath> qNs = new ArrayList<>(availableNotifications.size());
- for (final NotificationDefinition nd : availableNotifications) {
- qNs.add(nd.getPath());
+ private synchronized Future<RpcResult<JoinTopicOutput>> registerNotificationListener(final TopicId topicId, final List<SchemaPath> notificationsToSubscribe){
+ if(listenerRegistrationMap.containsKey(topicId)){
+ final String errMessage = "Can not join topic twice. Topic " + topicId.getValue() + " has been joined to node " + this.nodeId;
+ return immediateFuture(RpcResultBuilder.<JoinTopicOutput>failed().withError(ErrorType.APPLICATION, errMessage).build());
}
- return qNs;
- }
-
- private void registerNotificationListener(final List<SchemaPath> notificationsToSubscribe) {
-
+ ListenerRegistration<NetconfEventSource> registration = null;
+ JoinTopicStatus joinTopicStatus = JoinTopicStatus.Down;
final Optional<DOMNotificationService> notifyService = netconfMount.getService(DOMNotificationService.class);
+
if(notifyService.isPresent()) {
for (final SchemaPath qName : notificationsToSubscribe) {
startSubscription(qName);
}
- // FIXME: Capture registration
- notifyService.get().registerNotificationListener(this, notificationsToSubscribe);
+ registration = notifyService.get().registerNotificationListener(this, notificationsToSubscribe);
}
+
+ if(registration != null){
+ listenerRegistrationMap.put(topicId,registration);
+ joinTopicStatus = JoinTopicStatus.Up;
+ }
+ final JoinTopicOutput output = new JoinTopicOutputBuilder().setStatus(joinTopicStatus).build();
+ return immediateFuture(RpcResultBuilder.success(output).build());
}
private void startSubscription(final SchemaPath path) {
final String streamName = resolveStream(path.getLastComponent());
-
- if (streamIsActive(streamName) == false) {
- LOG.info("Stream {} is not active on node {}. Will subscribe.", streamName, nodeId);
- startSubscription(streamName);
- }
+ startSubscription(streamName);
}
private void resubscribeToActiveStreams() {
}
private synchronized void startSubscription(final String streamName) {
- final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME))
- .withChild(ImmutableNodes.leafNode(STREAM_QNAME, streamName))
- .build();
- netconfMount.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input);
- activeStreams.add(streamName);
+ if(streamIsActive(streamName) == false){
+ LOG.info("Stream {} is not active on node {}. Will subscribe.", streamName, nodeId);
+ final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME))
+ .withChild(ImmutableNodes.leafNode(STREAM_QNAME, streamName))
+ .build();
+ netconfMount.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input);
+ activeStreams.add(streamName);
+ }
}
private String resolveStream(final QName qName) {
final Optional<String> namespace = Optional.of(PAYLOAD_ARG.getNodeType().getNamespace().toString());
final Element element = XmlUtil.createElement(doc , "payload", namespace);
-
final DOMResult result = new DOMResult(element);
final SchemaContext context = netconfMount.getSchemaContext();
return NetconfNode.class.equals(changeEntry.getKey().getTargetType());
}
+ private List<SchemaPath> getMatchingNotifications(NotificationPattern notificationPattern){
+ // FIXME: default language should already be regex
+ final String regex = Util.wildcardToRegex(notificationPattern.getValue());
+
+ final Pattern pattern = Pattern.compile(regex);
+ return Util.expandQname(getAvailableNotifications(), pattern);
+ }
+
+ @Override
+ public void close() throws Exception {
+ for(ListenerRegistration<NetconfEventSource> registration : listenerRegistrationMap.values()){
+ registration.close();
+ }
+ }
+
+ @Override
+ public NodeKey getSourceNodeKey(){
+ return node.getKey();
+ }
+
+ @Override
+ public List<SchemaPath> getAvailableNotifications() {
+ // FIXME: use SchemaContextListener to get changes asynchronously
+ final Set<NotificationDefinition> availableNotifications = netconfMount.getSchemaContext().getNotifications();
+ final List<SchemaPath> qNs = new ArrayList<>(availableNotifications.size());
+ for (final NotificationDefinition nd : availableNotifications) {
+ qNs.add(nd.getPath());
+ }
+ return qNs;
+ }
+
}
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.messagebus.app.impl;
+package org.opendaylight.controller.messagebus.eventsources.netconf;
-import com.google.common.base.Optional;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-
import org.opendaylight.controller.config.yang.messagebus.app.impl.NamespaceToStream;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
.build();
private static final QName NODE_ID_QNAME = QName.create(Node.QNAME,"node-id");
-
- private final EventSourceTopology eventSourceTopology;
private final Map<String, String> streamMap;
-
- private final ConcurrentHashMap<InstanceIdentifier<?>, NetconfEventSource> netconfSources = new ConcurrentHashMap<>();
- private final ListenerRegistration<DataChangeListener> listenerReg;
+ private final ConcurrentHashMap<InstanceIdentifier<?>, EventSourceRegistration<NetconfEventSource>> eventSourceRegistration = new ConcurrentHashMap<>();
private final DOMNotificationPublishService publishService;
private final DOMMountPointService domMounts;
private final MountPointService bindingMounts;
+ private ListenerRegistration<DataChangeListener> listenerRegistration;
+ private final EventSourceRegistry eventSourceRegistry;
+
+ public static NetconfEventSourceManager create(final DataBroker dataBroker,
+ final DOMNotificationPublishService domPublish,
+ final DOMMountPointService domMount,
+ final MountPointService bindingMount,
+ final EventSourceRegistry eventSourceRegistry,
+ final List<NamespaceToStream> namespaceMapping){
+
+ final NetconfEventSourceManager eventSourceManager =
+ new NetconfEventSourceManager(domPublish, domMount, bindingMount, eventSourceRegistry, namespaceMapping);
- public NetconfEventSourceManager(final DataBroker dataStore,
- final DOMNotificationPublishService domPublish,
+ eventSourceManager.initialize(dataBroker);
+
+ return eventSourceManager;
+
+ }
+
+ private NetconfEventSourceManager(final DOMNotificationPublishService domPublish,
final DOMMountPointService domMount,
final MountPointService bindingMount,
- final EventSourceTopology eventSourceTopology,
+ final EventSourceRegistry eventSourceRegistry,
final List<NamespaceToStream> namespaceMapping) {
- listenerReg = dataStore.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, NETCONF_DEVICE_PATH, this, DataChangeScope.SUBTREE);
- this.eventSourceTopology = eventSourceTopology;
+ Preconditions.checkNotNull(domPublish);
+ Preconditions.checkNotNull(domMount);
+ Preconditions.checkNotNull(bindingMount);
+ Preconditions.checkNotNull(eventSourceRegistry);
+ Preconditions.checkNotNull(namespaceMapping);
this.streamMap = namespaceToStreamMapping(namespaceMapping);
this.domMounts = domMount;
this.bindingMounts = bindingMount;
this.publishService = domPublish;
- LOGGER.info("EventSourceManager initialized.");
+ this.eventSourceRegistry = eventSourceRegistry;
+ }
+
+ private void initialize(final DataBroker dataBroker){
+ Preconditions.checkNotNull(dataBroker);
+ listenerRegistration = dataBroker.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, NETCONF_DEVICE_PATH, this, DataChangeScope.SUBTREE);
+ LOGGER.info("NetconfEventSourceManager initialized.");
}
private Map<String,String> namespaceToStreamMapping(final List<NamespaceToStream> namespaceMapping) {
@Override
public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> event) {
- //FIXME: Prevent creating new event source on subsequent changes in inventory, like disconnect.
+
LOGGER.debug("[DataChangeEvent<InstanceIdentifier<?>, DataObject>: {}]", event);
for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getCreatedData().entrySet()) {
if (changeEntry.getValue() instanceof Node) {
}
}
-
for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getUpdatedData().entrySet()) {
if (changeEntry.getValue() instanceof Node) {
nodeUpdated(changeEntry.getKey(),(Node) changeEntry.getValue());
}
}
-
}
private void nodeUpdated(final InstanceIdentifier<?> key, final Node node) {
// we listen on node tree, therefore we should rather throw IllegalStateException when node is null
if ( node == null ) {
- LOGGER.debug("OnDataChanged Event. Node is null.");
- return;
+ throw new IllegalStateException("Node is null");
}
if ( isNetconfNode(node) == false ) {
LOGGER.debug("OnDataChanged Event. Not a Netconf node.");
return;
}
- if(!netconfSources.containsKey(key)) {
+ if(!eventSourceRegistration.containsKey(key)) {
createEventSource(key,node);
}
}
final Optional<MountPoint> bindingMount = bindingMounts.getMountPoint(key);
if(netconfMount.isPresent() && bindingMount.isPresent()) {
- final String nodeId = node.getNodeId().getValue();
- final NetconfEventSource netconfEventSource = new NetconfEventSource(nodeId, streamMap, netconfMount.get(), publishService, bindingMount.get());
- eventSourceTopology.register(node,netconfEventSource);
- netconfSources.putIfAbsent(key, netconfEventSource);
+
+ final NetconfEventSource netconfEventSource =
+ new NetconfEventSource(node, streamMap, netconfMount.get(), publishService, bindingMount.get());
+ final EventSourceRegistration<NetconfEventSource> registration = eventSourceRegistry.registerEventSource(netconfEventSource);
+ eventSourceRegistration.putIfAbsent(key, registration);
+
}
}
return node.getAugmentation(NetconfNode.class) != null ;
}
- public boolean isEventSource(final Node node) {
- final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
+ private boolean isEventSource(final Node node) {
+ final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
return isEventSource(netconfNode);
+
}
private boolean isEventSource(final NetconfNode node) {
+ if (node.getAvailableCapabilities() == null) {
+ return false;
+ }
+ final List<String> capabilities = node.getAvailableCapabilities().getAvailableCapability();
+ if(capabilities == null) {
+ return false;
+ }
for (final String capability : node.getAvailableCapabilities().getAvailableCapability()) {
if(capability.startsWith("(urn:ietf:params:xml:ns:netconf:notification")) {
return true;
@Override
public void close() {
- listenerReg.close();
+ for(final EventSourceRegistration<NetconfEventSource> reg : eventSourceRegistration.values()){
+ reg.close();
+ }
+ listenerRegistration.close();
}
+
}
\ No newline at end of file
import config { prefix config; revision-date 2013-04-05; }
import opendaylight-md-sal-binding {prefix sal;}
import opendaylight-md-sal-dom {prefix dom;}
-
+ import messagebus-event-source-registry {prefix esr;}
description
"Service definition for Message Bus application implementation.";
identity messagebus-app-impl {
base config:module-type;
+ config:provided-service esr:event-source-registry;
config:java-name-prefix MessageBusAppImpl;
}
-
+
augment "/config:modules/config:module/config:configuration" {
case messagebus-app-impl {
when "/config:modules/config:module/config:type = 'messagebus-app-impl'";
*/
package org.opendaylight.controller.config.yang.messagebus.app.impl;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.config.api.DynamicMBeanWithInstance;
import org.osgi.framework.BundleContext;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-
public class MessageBusAppImplModuleFactoryTest {
DependencyResolver dependencyResolverMock;
*/
package org.opendaylight.controller.config.yang.messagebus.app.impl;
-import com.google.common.util.concurrent.CheckedFuture;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.config.api.DependencyResolver;
import org.opendaylight.controller.config.api.ModuleIdentifier;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.messagebus.app.impl.EventSourceTopology;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
-import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
-import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
-import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.controller.sal.core.api.Provider;
-import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService;
-import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
-import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.osgi.framework.BundleContext;
-import javax.management.ObjectName;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.notNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.doNothing;
-
public class MessageBusAppImplModuleTest {
MessageBusAppImplModule messageBusAppImplModule;
assertEquals("Set and/or get method/s don't work correctly.", bundleContext, messageBusAppImplModule.getBundleContext());
}
- @Test
- public void createInstanceTest() {
- createInstanceTestHelper();
- messageBusAppImplModule.getInstance();
- assertNotNull("AutoCloseable instance has not been created correctly.", messageBusAppImplModule.createInstance());
- }
-
- private void createInstanceTestHelper(){
- NamespaceToStream namespaceToStream = mock(NamespaceToStream.class);
- List<NamespaceToStream> listNamespaceToStreamMock = new ArrayList<>();
- listNamespaceToStreamMock.add(namespaceToStream);
- messageBusAppImplModule.setNamespaceToStream(listNamespaceToStreamMock);
- ObjectName objectName = mock(ObjectName.class);
- org.opendaylight.controller.sal.core.api.Broker domBrokerDependency = mock(Broker.class);
- org.opendaylight.controller.sal.binding.api.BindingAwareBroker bindingBrokerDependency = mock(BindingAwareBroker.class);
- when(dependencyResolverMock.resolveInstance((java.lang.Class) notNull(), (javax.management.ObjectName) notNull(), eq(AbstractMessageBusAppImplModule.domBrokerJmxAttribute))).thenReturn(domBrokerDependency);
- when(dependencyResolverMock.resolveInstance((java.lang.Class) notNull(), (javax.management.ObjectName) notNull(), eq(AbstractMessageBusAppImplModule.bindingBrokerJmxAttribute))).thenReturn(bindingBrokerDependency);
- messageBusAppImplModule.setBindingBroker(objectName);
- messageBusAppImplModule.setDomBroker(objectName);
- BindingAwareBroker.ProviderContext providerContextMock = mock(BindingAwareBroker.ProviderContext.class);
- doReturn(providerContextMock).when(bindingBrokerDependency).registerProvider(any(BindingAwareProvider.class));
- Broker.ProviderSession providerSessionMock = mock(Broker.ProviderSession.class);
- doReturn(providerSessionMock).when(domBrokerDependency).registerProvider(any(Provider.class));
-
- DataBroker dataBrokerMock = mock(DataBroker.class);
- doReturn(dataBrokerMock).when(providerContextMock).getSALService(DataBroker.class);
- RpcProviderRegistry rpcProviderRegistryMock = mock(RpcProviderRegistry.class);
- doReturn(rpcProviderRegistryMock).when(providerContextMock).getSALService(RpcProviderRegistry.class);
- BindingAwareBroker.RpcRegistration rpcRegistrationMock = mock(BindingAwareBroker.RpcRegistration.class);
- doReturn(rpcRegistrationMock).when(rpcProviderRegistryMock).addRpcImplementation(eq(EventAggregatorService.class), any(EventSourceTopology.class));
- EventSourceService eventSourceServiceMock = mock(EventSourceService.class);
- doReturn(eventSourceServiceMock).when(rpcProviderRegistryMock).getRpcService(EventSourceService.class);
-
- WriteTransaction writeTransactionMock = mock(WriteTransaction.class);
- doReturn(writeTransactionMock).when(dataBrokerMock).newWriteOnlyTransaction();
- doNothing().when(writeTransactionMock).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(DataObject.class));
- CheckedFuture checkedFutureMock = mock(CheckedFuture.class);
- doReturn(checkedFutureMock).when(writeTransactionMock).submit();
- }
+ //TODO: create MessageBusAppImplModule.createInstance test
}
*/
package org.opendaylight.controller.messagebus.app.impl;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.HashMap;
+import java.util.Map;
+
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.any;
-
public class EventSourceTopicTest {
EventSourceTopic eventSourceTopic;
*/
package org.opendaylight.controller.messagebus.app.impl;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.ArrayList;
+import java.util.List;
+
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.messagebus.spi.EventSource;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput;
-import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.Pattern;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.eq;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
public class EventSourceTopologyTest {
public void setUp() throws Exception {
dataBrokerMock = mock(DataBroker.class);
rpcProviderRegistryMock = mock(RpcProviderRegistry.class);
-
}
@Test
}
private void constructorTestHelper(){
+ RpcRegistration<EventAggregatorService> aggregatorRpcReg = mock(RpcRegistration.class);
+ EventSourceService eventSourceService = mock(EventSourceService.class);
+ doReturn(aggregatorRpcReg).when(rpcProviderRegistryMock).addRpcImplementation(eq(EventAggregatorService.class), any(EventSourceTopology.class));
+ doReturn(eventSourceService).when(rpcProviderRegistryMock).getRpcService(EventSourceService.class);
WriteTransaction writeTransactionMock = mock(WriteTransaction.class);
doReturn(writeTransactionMock).when(dataBrokerMock).newWriteOnlyTransaction();
- doNothing().when(writeTransactionMock).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(DataObject.class));
+ doNothing().when(writeTransactionMock).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(DataObject.class),eq(true));
CheckedFuture checkedFutureMock = mock(CheckedFuture.class);
doReturn(checkedFutureMock).when(writeTransactionMock).submit();
}
- @Test
- public void createTopicTest() throws Exception{
- createTopicTestHelper();
- assertNotNull("Topic has not been created correctly.", eventSourceTopology.createTopic(createTopicInputMock));
- }
+//TODO: create test for createTopic
+// public void createTopicTest() throws Exception{
+// createTopicTestHelper();
+// assertNotNull("Topic has not been created correctly.", eventSourceTopology.createTopic(createTopicInputMock));
+// }
- private void createTopicTestHelper() throws Exception{
+ private void topicTestHelper() throws Exception{
constructorTestHelper();
createTopicInputMock = mock(CreateTopicInput.class);
eventSourceTopology = new EventSourceTopology(dataBrokerMock, rpcProviderRegistryMock);
@Test
public void destroyTopicTest() throws Exception{
- createTopicTestHelper();
+ topicTestHelper();
+ //TODO: modify test when destroyTopic will be implemented
DestroyTopicInput destroyTopicInput = null;
assertNotNull("Instance has not been created correctly.", eventSourceTopology.destroyTopic(destroyTopicInput));
}
- @Test
- public void closeTest() throws Exception{
- BindingAwareBroker.RpcRegistration rpcRegistrationMock = mock(BindingAwareBroker.RpcRegistration.class);
- doReturn(rpcRegistrationMock).when(rpcProviderRegistryMock).addRpcImplementation(eq(EventAggregatorService.class), any(EventSourceTopology.class));
- doNothing().when(rpcRegistrationMock).close();
- createTopicTestHelper();
- eventSourceTopology.createTopic(createTopicInputMock);
- eventSourceTopology.close();
- verify(rpcRegistrationMock, times(1)).close();
- }
-
@Test
public void registerTest() throws Exception {
- createTopicTestHelper();
+ topicTestHelper();
Node nodeMock = mock(Node.class);
- NetconfEventSource netconfEventSourceMock = mock(NetconfEventSource.class);
-
+ EventSource eventSourceMock = mock(EventSource.class);
NodeId nodeId = new NodeId("nodeIdValue1");
nodeKey = new NodeKey(nodeId);
doReturn(nodeKey).when(nodeMock).getKey();
-
+ doReturn(nodeKey).when(eventSourceMock).getSourceNodeKey();
BindingAwareBroker.RoutedRpcRegistration routedRpcRegistrationMock = mock(BindingAwareBroker.RoutedRpcRegistration.class);
- doReturn(routedRpcRegistrationMock).when(rpcProviderRegistryMock).addRoutedRpcImplementation(EventSourceService.class, netconfEventSourceMock);
- eventSourceTopology.register(nodeMock, netconfEventSourceMock);
+ doReturn(routedRpcRegistrationMock).when(rpcProviderRegistryMock).addRoutedRpcImplementation(EventSourceService.class, eventSourceMock);
+ doNothing().when(routedRpcRegistrationMock).registerPath(eq(NodeContext.class), any(KeyedInstanceIdentifier.class));
+ eventSourceTopology.register(eventSourceMock);
verify(routedRpcRegistrationMock, times(1)).registerPath(eq(NodeContext.class), any(KeyedInstanceIdentifier.class));
}
*/
package org.opendaylight.controller.messagebus.app.impl;
-import com.google.common.base.Optional;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.notNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.messagebus.eventsources.netconf.NetconfEventSourceManager;
+import org.opendaylight.controller.messagebus.spi.EventSource;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.notNull;
+import com.google.common.base.Optional;
public class NetconfEventSourceManagerTest {
+ private static final String notification_capability_prefix = "(urn:ietf:params:xml:ns:netconf:notification";
NetconfEventSourceManager netconfEventSourceManager;
ListenerRegistration listenerRegistrationMock;
DOMMountPointService domMountPointServiceMock;
MountPointService mountPointServiceMock;
EventSourceTopology eventSourceTopologyMock;
AsyncDataChangeEvent asyncDataChangeEventMock;
-
+ RpcProviderRegistry rpcProviderRegistryMock;
+ EventSourceRegistry eventSourceRegistry;
@BeforeClass
public static void initTestClass() throws IllegalAccessException, InstantiationException {
}
domMountPointServiceMock = mock(DOMMountPointService.class);
mountPointServiceMock = mock(MountPointService.class);
eventSourceTopologyMock = mock(EventSourceTopology.class);
+ rpcProviderRegistryMock = mock(RpcProviderRegistry.class);
+ eventSourceRegistry = mock(EventSourceRegistry.class);
List<NamespaceToStream> namespaceToStreamList = new ArrayList<>();
listenerRegistrationMock = mock(ListenerRegistration.class);
doReturn(listenerRegistrationMock).when(dataBrokerMock).registerDataChangeListener(eq(LogicalDatastoreType.OPERATIONAL), any(InstanceIdentifier.class), any(NetconfEventSourceManager.class), eq(AsyncDataBroker.DataChangeScope.SUBTREE));
- netconfEventSourceManager = new NetconfEventSourceManager(dataBrokerMock, domNotificationPublishServiceMock, domMountPointServiceMock,
- mountPointServiceMock, eventSourceTopologyMock, namespaceToStreamList);
+ netconfEventSourceManager =
+ NetconfEventSourceManager.create(dataBrokerMock,
+ domNotificationPublishServiceMock,
+ domMountPointServiceMock,
+ mountPointServiceMock,
+ eventSourceRegistry,
+ namespaceToStreamList);
}
@Test
- public void constructorTest() {
- assertNotNull("Instance has not been created correctly.", netconfEventSourceManager);
+ public void onDataChangedCreateEventSourceTestByCreateEntry() throws InterruptedException, ExecutionException {
+ onDataChangedTestHelper(true,false,true,notification_capability_prefix);
+ netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
+ verify(eventSourceRegistry, times(1)).registerEventSource(any(EventSource.class));
}
@Test
- public void onDataChangedTest() {
- AsyncDataChangeEvent asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class);
- Map<InstanceIdentifier, DataObject> map = new HashMap<>();
- InstanceIdentifier instanceIdentifierMock = mock(InstanceIdentifier.class);
- Node dataObjectMock = mock(Node.class);
- map.put(instanceIdentifierMock, dataObjectMock);
- doReturn(map).when(asyncDataChangeEventMock).getCreatedData();
- doReturn(map).when(asyncDataChangeEventMock).getUpdatedData();
+ public void onDataChangedCreateEventSourceTestByUpdateEntry() throws InterruptedException, ExecutionException {
+ onDataChangedTestHelper(false,true,true, notification_capability_prefix);
+ netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
+ verify(eventSourceRegistry, times(1)).registerEventSource(any(EventSource.class));
+ }
+
+ @Test
+ public void onDataChangedCreateEventSourceTestNotNeconf() throws InterruptedException, ExecutionException {
+ onDataChangedTestHelper(false,true,false,notification_capability_prefix);
netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
- verify(dataObjectMock, times(2)).getAugmentation(NetconfNode.class);
+ verify(eventSourceRegistry, times(0)).registerEventSource(any(EventSource.class));
}
@Test
- public void onDataChangedCreateEventSourceTest() {
- onDataChangedCreateEventSourceTestHelper();
+ public void onDataChangedCreateEventSourceTestNotNotificationCapability() throws InterruptedException, ExecutionException {
+ onDataChangedTestHelper(false,true,true,"bad-prefix");
netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
- verify(eventSourceTopologyMock, times(1)).register(any(Node.class), any(NetconfEventSource.class));
+ verify(eventSourceRegistry, times(0)).registerEventSource(any(EventSource.class));
}
- private void onDataChangedCreateEventSourceTestHelper(){
+ private void onDataChangedTestHelper(boolean create, boolean update, boolean isNetconf, String notificationCapabilityPrefix) throws InterruptedException, ExecutionException{
asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class);
- Map<InstanceIdentifier, DataObject> map = new HashMap<>();
+ Map<InstanceIdentifier, DataObject> mapCreate = new HashMap<>();
+ Map<InstanceIdentifier, DataObject> mapUpdate = new HashMap<>();
InstanceIdentifier instanceIdentifierMock = mock(InstanceIdentifier.class);
Node dataObjectMock = mock(Node.class);
- map.put(instanceIdentifierMock, dataObjectMock);
- doReturn(map).when(asyncDataChangeEventMock).getCreatedData();
- doReturn(map).when(asyncDataChangeEventMock).getUpdatedData();
-
+ if(create){
+ mapCreate.put(instanceIdentifierMock, dataObjectMock);
+ }
+ if(update){
+ mapUpdate.put(instanceIdentifierMock, dataObjectMock);
+ }
+ doReturn(mapCreate).when(asyncDataChangeEventMock).getCreatedData();
+ doReturn(mapUpdate).when(asyncDataChangeEventMock).getUpdatedData();
NetconfNode netconfNodeMock = mock(NetconfNode.class);
AvailableCapabilities availableCapabilitiesMock = mock(AvailableCapabilities.class);
- doReturn(netconfNodeMock).when(dataObjectMock).getAugmentation(NetconfNode.class);
- doReturn(availableCapabilitiesMock).when(netconfNodeMock).getAvailableCapabilities();
- List<String> availableCapabilityList = new ArrayList<>();
- availableCapabilityList.add("(urn:ietf:params:xml:ns:netconf:notification_availableCapabilityString1");
- doReturn(availableCapabilityList).when(availableCapabilitiesMock).getAvailableCapability();
-
- doReturn(NetconfNodeFields.ConnectionStatus.Connected).when(netconfNodeMock).getConnectionStatus();
+ if(isNetconf){
+ doReturn(netconfNodeMock).when(dataObjectMock).getAugmentation(NetconfNode.class);
+ doReturn(availableCapabilitiesMock).when(netconfNodeMock).getAvailableCapabilities();
+ List<String> availableCapabilityList = new ArrayList<>();
+ availableCapabilityList.add(notificationCapabilityPrefix +"_availableCapabilityString1");
+ doReturn(availableCapabilityList).when(availableCapabilitiesMock).getAvailableCapability();
+ doReturn(NetconfNodeFields.ConnectionStatus.Connected).when(netconfNodeMock).getConnectionStatus();
+ } else {
+ doReturn(null).when(dataObjectMock).getAugmentation(NetconfNode.class);
+ }
Optional optionalMock = mock(Optional.class);
Optional optionalBindingMountMock = mock(Optional.class);
doReturn(onlyOptionalMock).when(mountPointMock).getService(RpcConsumerRegistry.class);
doReturn(rpcConsumerRegistryMock).when(onlyOptionalMock).get();
doReturn(notificationsServiceMock).when(rpcConsumerRegistryMock).getRpcService(NotificationsService.class);
+ EventSourceRegistrationImpl esrMock = mock(EventSourceRegistrationImpl.class);
+ doReturn(esrMock).when(eventSourceRegistry).registerEventSource(any(EventSource.class));
}
- @Test
- public void isEventSourceTest() {
- Node nodeMock = mock(Node.class);
- NetconfNode netconfNodeMock = mock(NetconfNode.class);
- AvailableCapabilities availableCapabilitiesMock = mock(AvailableCapabilities.class);
- doReturn(netconfNodeMock).when(nodeMock).getAugmentation(NetconfNode.class);
- doReturn(availableCapabilitiesMock).when(netconfNodeMock).getAvailableCapabilities();
- List<String> availableCapabilityList = new ArrayList<>();
- availableCapabilityList.add("(urn:ietf:params:xml:ns:netconf:notification_availableCapabilityString1");
- doReturn(availableCapabilityList).when(availableCapabilitiesMock).getAvailableCapability();
- assertTrue("Method has not been run correctly.", netconfEventSourceManager.isEventSource(nodeMock));
- }
-
- @Test
- public void isNotEventSourceTest() {
- Node nodeMock = mock(Node.class);
- NetconfNode netconfNodeMock = mock(NetconfNode.class);
- AvailableCapabilities availableCapabilitiesMock = mock(AvailableCapabilities.class);
- doReturn(netconfNodeMock).when(nodeMock).getAugmentation(NetconfNode.class);
- doReturn(availableCapabilitiesMock).when(netconfNodeMock).getAvailableCapabilities();
- List<String> availableCapabilityList = new ArrayList<>();
- availableCapabilityList.add("availableCapabilityString1");
- doReturn(availableCapabilityList).when(availableCapabilitiesMock).getAvailableCapability();
- assertFalse("Method has not been run correctly.", netconfEventSourceManager.isEventSource(nodeMock));
- }
-
- @Test
- public void closeTest() {
- netconfEventSourceManager.close();
- verify(listenerRegistrationMock, times(1)).close();
- }
}
*/
package org.opendaylight.controller.messagebus.app.impl;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.md.sal.binding.api.BindingService;
import org.opendaylight.controller.md.sal.binding.api.MountPoint;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
-import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
-import org.opendaylight.controller.md.sal.dom.api.DOMService;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.md.sal.dom.api.DOMService;
+import org.opendaylight.controller.messagebus.eventsources.netconf.NetconfEventSource;
import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
-import org.opendaylight.yangtools.yang.common.QName;
-
-import java.lang.reflect.Field;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.times;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
public class NetconfEventSourceTest {
NetconfEventSource netconfEventSource;
DOMMountPoint domMountPointMock;
JoinTopicInput joinTopicInputMock;
-
- @BeforeClass
- public static void initTestClass() throws IllegalAccessException, InstantiationException {
- }
+ AsyncDataChangeEvent asyncDataChangeEventMock;
+ Node dataObjectMock;
@Before
public void setUp() throws Exception {
doReturn(onlyOptionalMock).when(mountPointMock).getService(RpcConsumerRegistry.class);
doReturn(rpcConsumerRegistryMock).when(onlyOptionalMock).get();
doReturn(notificationsServiceMock).when(rpcConsumerRegistryMock).getRpcService(NotificationsService.class);
- netconfEventSource = new NetconfEventSource("nodeId1", streamMap, domMountPointMock, domNotificationPublishServiceMock, mountPointMock);
- }
-
- @Test
- public void constructorTest() {
- assertNotNull("Instance has not been created correctly.", netconfEventSource);
- }
-
- @Test
- public void joinTopicTest() throws Exception{
- joinTopicTestHelper();
- assertNotNull("JoinTopic return value has not been created correctly.", netconfEventSource.joinTopic(joinTopicInputMock));
- }
-
- private void joinTopicTestHelper() throws Exception{
- joinTopicInputMock = mock(JoinTopicInput.class);
- NotificationPattern notificationPatternMock = mock(NotificationPattern.class);
- doReturn(notificationPatternMock).when(joinTopicInputMock).getNotificationPattern();
- doReturn("regexString1").when(notificationPatternMock).getValue();
-
- SchemaContext schemaContextMock = mock(SchemaContext.class);
- doReturn(schemaContextMock).when(domMountPointMock).getSchemaContext();
- Set<NotificationDefinition> notificationDefinitionSet = new HashSet<>();
- NotificationDefinition notificationDefinitionMock = mock(NotificationDefinition.class);
- notificationDefinitionSet.add(notificationDefinitionMock);
-
- URI uri = new URI("uriStr1");
- QName qName = new QName(uri, "localName1");
- org.opendaylight.yangtools.yang.model.api.SchemaPath schemaPath = SchemaPath.create(true, qName);
- doReturn(notificationDefinitionSet).when(schemaContextMock).getNotifications();
- doReturn(schemaPath).when(notificationDefinitionMock).getPath();
-
- Optional<DOMNotificationService> domNotificationServiceOptionalMock = (Optional<DOMNotificationService>) mock(Optional.class);
- doReturn(domNotificationServiceOptionalMock).when(domMountPointMock).getService(DOMNotificationService.class);
- doReturn(true).when(domNotificationServiceOptionalMock).isPresent();
-
- DOMNotificationService domNotificationServiceMock = mock(DOMNotificationService.class);
- doReturn(domNotificationServiceMock).when(domNotificationServiceOptionalMock).get();
- ListenerRegistration listenerRegistrationMock = mock(ListenerRegistration.class);
- doReturn(listenerRegistrationMock).when(domNotificationServiceMock).registerNotificationListener(any(NetconfEventSource.class), any(List.class));
- }
-
- @Test (expected=NullPointerException.class)
- public void onNotificationTest() {
- DOMNotification domNotificationMock = mock(DOMNotification.class);
- ContainerNode containerNodeMock = mock(ContainerNode.class);
- SchemaContext schemaContextMock = mock(SchemaContext.class);
- SchemaPath schemaPathMock = mock(SchemaPath.class);
- doReturn(schemaContextMock).when(domMountPointMock).getSchemaContext();
- doReturn(schemaPathMock).when(domNotificationMock).getType();
- doReturn(containerNodeMock).when(domNotificationMock).getBody();
- netconfEventSource.onNotification(domNotificationMock);
+ org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node node
+ = mock(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node.class);
+ org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId nodeId
+ = new org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId("NodeId1");
+ doReturn(nodeId).when(node).getNodeId();
+ netconfEventSource = new NetconfEventSource(node, streamMap, domMountPointMock, domNotificationPublishServiceMock, mountPointMock);
}
@Test
- public void onDataChangedTest() {
+ public void onDataChangedTest(){
InstanceIdentifier brmIdent = InstanceIdentifier.create(Nodes.class)
.child(Node.class, new NodeKey(new NodeId("brm"))).augmentation(NetconfNode.class);
AsyncDataChangeEvent asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class);
dataChangeMap.put(brmIdent, dataObjectMock);
doReturn(dataChangeMap).when(asyncDataChangeEventMock).getOriginalData();
doReturn(dataChangeMap).when(asyncDataChangeEventMock).getUpdatedData();
-
+ doReturn(true).when(dataObjectMock).isConnected();
netconfEventSource.onDataChanged(asyncDataChangeEventMock);
verify(dataObjectMock, times(2)).isConnected();
}
@Test
public void onDataChangedResubscribeTest() throws Exception{
+
InstanceIdentifier brmIdent = InstanceIdentifier.create(Nodes.class)
.child(Node.class, new NodeKey(new NodeId("brm"))).augmentation(NetconfNode.class);
+
AsyncDataChangeEvent asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class);
NetconfNode dataObjectMock = mock(NetconfNode.class);
Map<InstanceIdentifier, DataObject> dataChangeMap = new HashMap<>();
dataChangeMap.put(brmIdent, dataObjectMock);
doReturn(dataChangeMap).when(asyncDataChangeEventMock).getUpdatedData();
+ doReturn(new HashMap<InstanceIdentifier, DataObject>()).when(asyncDataChangeEventMock).getOriginalData();
doReturn(true).when(dataObjectMock).isConnected();
Set<String> localSet = getActiveStreams();
assertEquals("Size of set has not been set correctly.", 1, getActiveStreams().size());
}
+ @Test
+ public void joinTopicTest() throws Exception{
+ joinTopicTestHelper();
+ assertNotNull("JoinTopic return value has not been created correctly.", netconfEventSource.joinTopic(joinTopicInputMock));
+ }
+
+ private void joinTopicTestHelper() throws Exception{
+ joinTopicInputMock = mock(JoinTopicInput.class);
+ TopicId topicId = new TopicId("topicID007");
+ doReturn(topicId).when(joinTopicInputMock).getTopicId();
+ NotificationPattern notificationPatternMock = mock(NotificationPattern.class);
+ doReturn(notificationPatternMock).when(joinTopicInputMock).getNotificationPattern();
+ doReturn("regexString1").when(notificationPatternMock).getValue();
+
+ SchemaContext schemaContextMock = mock(SchemaContext.class);
+ doReturn(schemaContextMock).when(domMountPointMock).getSchemaContext();
+ Set<NotificationDefinition> notificationDefinitionSet = new HashSet<>();
+ NotificationDefinition notificationDefinitionMock = mock(NotificationDefinition.class);
+ notificationDefinitionSet.add(notificationDefinitionMock);
+
+ URI uri = new URI("uriStr1");
+ QName qName = new QName(uri, "localName1");
+ org.opendaylight.yangtools.yang.model.api.SchemaPath schemaPath = SchemaPath.create(true, qName);
+ doReturn(notificationDefinitionSet).when(schemaContextMock).getNotifications();
+ doReturn(schemaPath).when(notificationDefinitionMock).getPath();
+
+ Optional<DOMNotificationService> domNotificationServiceOptionalMock = (Optional<DOMNotificationService>) mock(Optional.class);
+ doReturn(domNotificationServiceOptionalMock).when(domMountPointMock).getService(DOMNotificationService.class);
+ doReturn(true).when(domNotificationServiceOptionalMock).isPresent();
+
+ DOMNotificationService domNotificationServiceMock = mock(DOMNotificationService.class);
+ doReturn(domNotificationServiceMock).when(domNotificationServiceOptionalMock).get();
+ ListenerRegistration listenerRegistrationMock = mock(ListenerRegistration.class);
+ doReturn(listenerRegistrationMock).when(domNotificationServiceMock).registerNotificationListener(any(NetconfEventSource.class), any(List.class));
+ }
+
+//TODO: create Test for NetConfEventSource#onNotification
+
private Set getActiveStreams() throws Exception{
Field nesField = NetconfEventSource.class.getDeclaredField("activeStreams");
nesField.setAccessible(true);
*/
package org.opendaylight.controller.messagebus.app.impl;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.mock;
-
public class TopicDOMNotificationTest {
+ private static final String containerNodeBodyMockToString = "containerNodeBodyMock";
ContainerNode containerNodeBodyMock;
TopicDOMNotification topicDOMNotification;
@Before
public void setUp() throws Exception {
containerNodeBodyMock = mock(ContainerNode.class);
+ doReturn(containerNodeBodyMockToString).when(containerNodeBodyMock).toString();
topicDOMNotification = new TopicDOMNotification(containerNodeBodyMock);
}
@Test
public void getToStringTest() {
- String bodyString = "TopicDOMNotification [body=" + containerNodeBodyMock + "]";
+ String bodyString = "TopicDOMNotification [body=" + containerNodeBodyMockToString + "]";
assertEquals("String has not been created correctly.", bodyString, topicDOMNotification.toString());
}
}
/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
-
+/**
+ * @author ppalmar
+ *
+ */
public class UtilTest {
- @Test
- public void testMD5Hash() throws Exception {
- // empty string
- createAndAssertHash("", "d41d8cd98f00b204e9800998ecf8427e");
-
- // non-empty string
- createAndAssertHash("The Guardian", "69b929ae473ed732d5fb8e0a55a8dc8d");
-
- // the same hash for the same string
- createAndAssertHash("The Independent", "db793706d70c37dcc16454fa8eb21b1c");
- createAndAssertHash("The Independent", "db793706d70c37dcc16454fa8eb21b1c"); // one more time
-
- // different strings must have different hashes
- createAndAssertHash("orange", "fe01d67a002dfa0f3ac084298142eccd");
- createAndAssertHash("yellow", "d487dd0b55dfcacdd920ccbdaeafa351");
- }
-
- //TODO: IllegalArgumentException would be better
- @Test(expected = RuntimeException.class)
- public void testMD5HashInvalidInput() throws Exception {
- Util.md5String(null);
- }
-
@Test
public void testWildcardToRegex() throws Exception {
// empty wildcard string
public void testResultFor() throws Exception {
{
final String expectedResult = "dummy string";
- RpcResult<String> rpcResult = Util.resultFor(expectedResult).get();
+ RpcResult<String> rpcResult = Util.resultRpcSuccessFor(expectedResult).get();
assertEquals(expectedResult, rpcResult.getResult());
assertTrue(rpcResult.isSuccessful());
assertTrue(rpcResult.getErrors().isEmpty());
}
{
final Integer expectedResult = 42;
- RpcResult<Integer> rpcResult = Util.resultFor(expectedResult).get();
+ RpcResult<Integer> rpcResult = Util.resultRpcSuccessFor(expectedResult).get();
assertEquals(expectedResult, rpcResult.getResult());
assertTrue(rpcResult.isSuccessful());
assertTrue(rpcResult.getErrors().isEmpty());
}
}
- private static void createAndAssertHash(final String inString, final String expectedHash) {
- assertEquals("Incorrect hash.", expectedHash, Util.md5String(inString));
- }
-
private static void createAndAssertRegex(final String wildcardStr, final String expectedRegex) {
assertEquals("Incorrect regex string.", expectedRegex, Util.wildcardToRegex(wildcardStr));
}
--- /dev/null
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-parent</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>messagebus-spi</artifactId>
+ <name>${project.artifactId}</name>
+
+ <packaging>bundle</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>messagebus-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>config-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-data-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-model-api</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>generate-sources</goal>
+ </goals>
+ <configuration>
+ <codeGenerators>
+ <generator>
+ <codeGeneratorClass>
+ org.opendaylight.yangtools.maven.sal.api.gen.plugin.CodeGeneratorImpl
+ </codeGeneratorClass>
+ <outputBaseDir>
+ ${project.build.directory}/generated-sources/sal
+ </outputBaseDir>
+ </generator>
+ <generator>
+ <codeGeneratorClass>
+ org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator
+ </codeGeneratorClass>
+ <outputBaseDir>${project.build.directory}/generated-sources/config</outputBaseDir>
+ <additionalConfiguration>
+ <namespaceToPackage1>
+ urn:opendaylight:params:xml:ns:yang:controller==org.opendaylight.controller.config.yang
+ </namespaceToPackage1>
+ </additionalConfiguration>
+ </generator>
+ <generator>
+ <codeGeneratorClass>org.opendaylight.yangtools.yang.unified.doc.generator.maven.DocumentationGeneratorImpl</codeGeneratorClass>
+ <outputBaseDir>target/site/models</outputBaseDir>
+ </generator>
+ </codeGenerators>
+ <inspectDependencies>true</inspectDependencies>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>${project.build.directory}/generated-sources/config</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <scm>
+ <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
+ <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
+ <tag>HEAD</tag>
+ <url>https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL</url>
+ </scm>
+</project>
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.spi;
+
+import java.util.List;
+
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+/**
+ * Event source is a node in topology which is able to produces notifications.
+ * To register event source you use {@link EventSourceRegistry#registerEventSource(EventSource)()}.
+ * EventSourceRegistry will request registered event source to publish notifications
+ * whenever EventSourceRegistry has been asked to publish a certain type of notifications.
+ * EventSourceRegistry will call method JoinTopic to request EventSource to publish notification.
+ * Event source must implement method JoinTopic (from superinterface {@link EventSourceService}).
+ */
+
+public interface EventSource extends EventSourceService, AutoCloseable {
+
+ /**
+ * Identifier of node associated with event source
+ *
+ * @return instance of NodeKey
+ */
+ NodeKey getSourceNodeKey();
+
+ /**
+ * List the types of notifications which source can produce.
+ *
+ * @return list of available notification
+ */
+ List<SchemaPath> getAvailableNotifications();
+
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.spi;
+
+import org.opendaylight.yangtools.concepts.ObjectRegistration;
+
+/**
+ * Instance of EventSourceRegistration is returned by {@link EventSourceRegistry#registerEventSource(EventSource)}
+ * and it is used to unregister EventSource.
+ *
+ */
+public interface EventSourceRegistration <T extends EventSource> extends ObjectRegistration<T>{
+
+ @Override
+ public void close();
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.spi;
+
+/**
+ *EventSourceRegistry is used to register {@link EventSource}.
+ *
+ */
+public interface EventSourceRegistry extends AutoCloseable {
+
+ /**
+ * Registers the given EventSource for public consumption. The EventSource is
+ * associated with the node identified via {@linkEventSource#getSourceNodeKey}.
+ *
+ * @param eventSource the EventSource instance to register
+ * @return an EventSourceRegistration instance that is used to unregister the EventSource via {@link EventSourceRegistrationImpl#close()}.
+ */
+ <T extends EventSource> EventSourceRegistration<T> registerEventSource(T eventSource);
+
+}
\ No newline at end of file
--- /dev/null
+module messagebus-event-source-registry {
+ yang-version 1;
+ namespace "urn:opendaylight:params:xml:ns:yang:controller:messagebus:spi:eventsourceregistry";
+ prefix "mb-esr";
+
+ import config { prefix config; revision-date 2013-04-05; }
+
+ description
+ "Event source registry service interface definition for MessageBus";
+
+ revision "2015-04-02" {
+ description
+ "Initial revision";
+ }
+
+ identity event-source-registry {
+ base "config:service-type";
+ config:java-class "org.opendaylight.controller.messagebus.spi.EventSourceRegistry";
+ }
+
+}
<!-- Message Bus -->
<module>messagebus-api</module>
+ <module>messagebus-spi</module>
<module>messagebus-impl</module>
<module>messagebus-config</module>
</modules>
</modules>
</profile>
</profiles>
-</project>
\ No newline at end of file
+</project>