<configfile finalname="${config.configfile.directory}/20-clustering-test-app.xml">mvn:org.opendaylight.controller.samples/clustering-it-config/${project.version}/xml/config</configfile>
</feature>
- <!-- FIXME decouple messagebus code into messagebus and messagebus netconf -->
- <feature name='odl-message-bus' version='${project.version}'>
- <!--<feature version='${project.version}'>odl-netconf-connector</feature>-->
+ <feature name='odl-message-bus-collector' version='${project.version}'>
<bundle>mvn:org.opendaylight.controller.model/model-inventory/${mdsal.version}</bundle>
<feature version='${project.version}'>odl-mdsal-broker</feature>
<bundle>mvn:org.opendaylight.controller/messagebus-api/${project.version}</bundle>
<bundle>mvn:org.opendaylight.controller/messagebus-spi/${project.version}</bundle>
+ <bundle>mvn:org.opendaylight.controller/messagebus-util/${project.version}</bundle>
<bundle>mvn:org.opendaylight.controller/messagebus-impl/${project.version}</bundle>
<configfile finalname="${config.configfile.directory}/05-message-bus.xml">mvn:org.opendaylight.controller/messagebus-config/${project.version}/xml/config</configfile>
</feature>
<artifactId>messagebus-impl</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>messagebus-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
<name>messagebus-app-impl</name>
<type xmlns:binding-impl="urn:opendaylight:params:xml:ns:yang:controller:messagebus:app:impl">binding-impl:messagebus-app-impl</type>
<binding-broker xmlns="urn:opendaylight:params:xml:ns:yang:controller:messagebus:app:impl">
- <type xmlns:md-sal-binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">md-sal-binding:binding-broker-osgi-registry</type>
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">prefix:binding-broker-osgi-registry</type>
<name>binding-osgi-broker</name>
</binding-broker>
- <dom-broker xmlns="urn:opendaylight:params:xml:ns:yang:controller:messagebus:app:impl">
- <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:dom-broker-osgi-registry</type>
- <name>dom-broker</name>
- </dom-broker>
- <namespace-to-stream>
- <urn-prefix>urn:ietf:params:xml:ns:yang:smiv2</urn-prefix>
- <stream-name>SNMP</stream-name>
- </namespace-to-stream>
- <namespace-to-stream>
- <urn-prefix>urn:ietf:params:xml:ns:yang:ietf-syslog-notification</urn-prefix>
- <stream-name>SYSLOG</stream-name>
- </namespace-to-stream>
</module>
</modules>
<services xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
<dependency>\r
<groupId>org.opendaylight.controller</groupId>\r
<artifactId>messagebus-api</artifactId>\r
- <version>1.3.0-SNAPSHOT</version>\r
+ </dependency>\r
+ <dependency>\r
+ <groupId>org.opendaylight.controller</groupId>\r
+ <artifactId>messagebus-util</artifactId>\r
</dependency>\r
<dependency>\r
<groupId>org.opendaylight.controller</groupId>\r
<artifactId>messagebus-spi</artifactId>\r
- <version>1.3.0-SNAPSHOT</version>\r
</dependency>\r
- <!--<dependency>-->\r
- <!--<groupId>org.opendaylight.controller</groupId>-->\r
- <!--<artifactId>sal-netconf-connector</artifactId>-->\r
- <!--</dependency>-->\r
<dependency>\r
<groupId>org.opendaylight.controller</groupId>\r
<artifactId>sal-binding-config</artifactId>\r
*/
package org.opendaylight.controller.config.yang.messagebus.app.impl;
-import java.util.HashSet;
-import java.util.Set;
-
import org.opendaylight.controller.config.api.DependencyResolver;
import org.opendaylight.controller.config.api.ModuleIdentifier;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.MountPointService;
-import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
-import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
import org.opendaylight.controller.messagebus.app.impl.EventSourceTopology;
-//import org.opendaylight.controller.messagebus.eventsources.netconf.NetconfEventSourceManager;
-import org.opendaylight.controller.messagebus.spi.EventSource;
-import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
-import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
+import org.opendaylight.controller.messagebus.app.util.Providers;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
-import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
import org.osgi.framework.BundleContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-
public class MessageBusAppImplModule extends org.opendaylight.controller.config.yang.messagebus.app.impl.AbstractMessageBusAppImplModule {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageBusAppImplModule.class);
@Override
public java.lang.AutoCloseable createInstance() {
-
final ProviderContext bindingCtx = getBindingBrokerDependency().registerProvider(new Providers.BindingAware());
- final ProviderSession domCtx = getDomBrokerDependency().registerProvider(new Providers.BindingIndependent());
final DataBroker dataBroker = bindingCtx.getSALService(DataBroker.class);
- final DOMNotificationPublishService domPublish = domCtx.getService(DOMNotificationPublishService.class);
- final DOMMountPointService domMount = domCtx.getService(DOMMountPointService.class);
final RpcProviderRegistry rpcRegistry = bindingCtx.getSALService(RpcProviderRegistry.class);
- final MountPointService mountPointService = bindingCtx.getSALService(MountPointService.class);
- final EventSourceRegistryWrapper eventSourceRegistryWrapper = new EventSourceRegistryWrapper(new EventSourceTopology(dataBroker, rpcRegistry));
-// final NetconfEventSourceManager netconfEventSourceManager
-// = NetconfEventSourceManager.create(dataBroker,
-// domPublish,
-// domMount,
-// mountPointService,
-// eventSourceRegistryWrapper,
-// getNamespaceToStream());
-// eventSourceRegistryWrapper.addAutoCloseable(netconfEventSourceManager);
+ final EventSourceTopology eventSourceTopology = new EventSourceTopology(dataBroker, rpcRegistry);
LOGGER.info("Messagebus initialized");
- return eventSourceRegistryWrapper;
-
+ return eventSourceTopology;
}
- //TODO: separate NetconfEventSource into separate bundle, remove this wrapper, return EventSourceTopology directly as EventSourceRegistry
- private class EventSourceRegistryWrapper implements EventSourceRegistry{
-
- private final EventSourceRegistry baseEventSourceRegistry;
- private final Set<AutoCloseable> autoCloseables = new HashSet<>();
-
- public EventSourceRegistryWrapper(EventSourceRegistry baseEventSourceRegistry) {
- this.baseEventSourceRegistry = baseEventSourceRegistry;
- }
-
- public void addAutoCloseable(AutoCloseable ac){
- Preconditions.checkNotNull(ac);
- autoCloseables.add(ac);
- }
-
- @Override
- public void close() throws Exception {
- for(AutoCloseable ac : autoCloseables){
- ac.close();
- }
- baseEventSourceRegistry.close();
- }
-
- @Override
- public <T extends EventSource> EventSourceRegistration<T> registerEventSource(T eventSource) {
- return this.baseEventSourceRegistry.registerEventSource(eventSource);
- }
-
- }
}
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.messagebus.app.util.Util;
import org.opendaylight.controller.messagebus.spi.EventSource;
import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
+++ /dev/null
-///*
-// * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
-// *
-// * This program and the accompanying materials are made available under the
-// * terms of the Eclipse Public License v1.0 which accompanies this distribution,
-// * and is available at http://www.eclipse.org/legal/epl-v10.html
-// */
-//
-//package org.opendaylight.controller.messagebus.eventsources.netconf;
-//
-//import static com.google.common.util.concurrent.Futures.immediateFuture;
-//
-//import java.io.IOException;
-//import java.util.ArrayList;
-//import java.util.Date;
-//import java.util.HashMap;
-//import java.util.List;
-//import java.util.Map;
-//import java.util.Set;
-//import java.util.concurrent.Future;
-//import java.util.regex.Pattern;
-//
-//import javax.xml.stream.XMLStreamException;
-//import javax.xml.transform.dom.DOMResult;
-//import javax.xml.transform.dom.DOMSource;
-//
-//import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-//import org.opendaylight.controller.md.sal.binding.api.MountPoint;
-//import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-//import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-//import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-//import org.opendaylight.controller.md.sal.dom.api.DOMEvent;
-//import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
-//import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
-//import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
-//import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
-//import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
-//import org.opendaylight.controller.messagebus.app.impl.TopicDOMNotification;
-//import org.opendaylight.controller.messagebus.app.impl.Util;
-//import org.opendaylight.controller.messagebus.spi.EventSource;
-//import org.opendaylight.controller.config.util.xml.XmlUtil;
-//import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
-//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
-//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
-//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicNotification;
-//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
-//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput;
-//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutputBuilder;
-//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicStatus;
-//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInput;
-//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf;
-//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
-//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
-//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
-//import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-//import org.opendaylight.yangtools.yang.common.QName;
-//import org.opendaylight.yangtools.yang.common.RpcResult;
-//import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-//import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
-//import org.opendaylight.yangtools.yang.data.api.schema.AnyXmlNode;
-//import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-//import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
-//import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-//import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
-//import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-//import org.opendaylight.yangtools.yang.model.api.SchemaPath;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//import org.w3c.dom.Document;
-//import org.w3c.dom.Element;
-//
-//import com.google.common.base.Optional;
-//import com.google.common.base.Preconditions;
-//import com.google.common.base.Throwables;
-//import com.google.common.util.concurrent.CheckedFuture;
-//
-//public class NetconfEventSource implements EventSource, DOMNotificationListener {
-//
-// private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSource.class);
-//
-// private static final NodeIdentifier TOPIC_NOTIFICATION_ARG = new NodeIdentifier(TopicNotification.QNAME);
-// private static final NodeIdentifier EVENT_SOURCE_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "node-id"));
-// private static final NodeIdentifier TOPIC_ID_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "topic-id"));
-// private static final NodeIdentifier PAYLOAD_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "payload"));
-// private static final String ConnectionNotificationSourceName = "ConnectionNotificationSource";
-//
-// private final String nodeId;
-// private final Node node;
-//
-// private final DOMMountPoint netconfMount;
-// private final MountPoint mountPoint;
-// private final DOMNotificationPublishService domPublish;
-//
-// private final Map<String, String> urnPrefixToStreamMap; // key = urnPrefix, value = StreamName
-// private final List<NotificationTopicRegistration> notificationTopicRegistrationList = new ArrayList<>();
-//
-// public NetconfEventSource(final Node node, final Map<String, String> streamMap, final DOMMountPoint netconfMount, final MountPoint mountPoint, final DOMNotificationPublishService publishService) {
-// this.netconfMount = Preconditions.checkNotNull(netconfMount);
-// this.mountPoint = Preconditions.checkNotNull(mountPoint);
-// this.node = Preconditions.checkNotNull(node);
-// this.urnPrefixToStreamMap = Preconditions.checkNotNull(streamMap);
-// this.domPublish = Preconditions.checkNotNull(publishService);
-// this.nodeId = node.getNodeId().getValue();
-// this.initializeNotificationTopicRegistrationList();
-//
-// LOG.info("NetconfEventSource [{}] created.", this.nodeId);
-// }
-//
-// private void initializeNotificationTopicRegistrationList() {
-// notificationTopicRegistrationList.add(new ConnectionNotificationTopicRegistration(ConnectionNotificationSourceName, this));
-// Optional<Map<String, Stream>> streamMap = getAvailableStreams();
-// if(streamMap.isPresent()){
-// LOG.debug("Stream configuration compare...");
-// for (String urnPrefix : this.urnPrefixToStreamMap.keySet()) {
-// final String streamName = this.urnPrefixToStreamMap.get(urnPrefix);
-// LOG.debug("urnPrefix: {} streamName: {}", urnPrefix, streamName);
-// if(streamMap.get().containsKey(streamName)){
-// LOG.debug("Stream containig on device");
-// notificationTopicRegistrationList.add(new StreamNotificationTopicRegistration(streamMap.get().get(streamName),urnPrefix, this));
-// }
-// }
-// }
-// }
-//
-// private Optional<Map<String, Stream>> getAvailableStreams(){
-//
-// Map<String,Stream> streamMap = null;
-// InstanceIdentifier<Streams> pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build();
-// Optional<DataBroker> dataBroker = this.mountPoint.getService(DataBroker.class);
-//
-// if(dataBroker.isPresent()){
-// LOG.debug("GET Available streams ...");
-// ReadOnlyTransaction tx = dataBroker.get().newReadOnlyTransaction();
-// CheckedFuture<Optional<Streams>, ReadFailedException> checkFeature = tx.read(LogicalDatastoreType.OPERATIONAL,pathStream);
-//
-// try {
-// Optional<Streams> streams = checkFeature.checkedGet();
-// if(streams.isPresent()){
-// streamMap = new HashMap<>();
-// for(Stream stream : streams.get().getStream()){
-// LOG.debug("*** find stream {}", stream.getName().getValue());
-// streamMap.put(stream.getName().getValue(), stream);
-// }
-// }
-// } catch (ReadFailedException e) {
-// LOG.warn("Can not read streams for node {}",this.nodeId);
-// }
-//
-// } else {
-// LOG.warn("No databroker on node {}", this.nodeId);
-// }
-//
-// return Optional.fromNullable(streamMap);
-// }
-//
-// @Override
-// public Future<RpcResult<JoinTopicOutput>> joinTopic(final JoinTopicInput input) {
-// LOG.debug("Join topic {} on {}", input.getTopicId().getValue(), this.nodeId);
-// final NotificationPattern notificationPattern = input.getNotificationPattern();
-// final List<SchemaPath> matchingNotifications = getMatchingNotifications(notificationPattern);
-// return registerTopic(input.getTopicId(),matchingNotifications);
-//
-// }
-//
-// @Override
-// public Future<RpcResult<Void>> disJoinTopic(DisJoinTopicInput input) {
-// for(NotificationTopicRegistration reg : notificationTopicRegistrationList){
-// reg.unRegisterNotificationTopic(input.getTopicId());
-// }
-// return Util.resultRpcSuccessFor((Void) null) ;
-// }
-//
-// private synchronized Future<RpcResult<JoinTopicOutput>> registerTopic(final TopicId topicId, final List<SchemaPath> notificationsToSubscribe){
-// LOG.debug("Join topic {} - register");
-// JoinTopicStatus joinTopicStatus = JoinTopicStatus.Down;
-// if(notificationsToSubscribe != null && notificationsToSubscribe.isEmpty() == false){
-// LOG.debug("Notifications to subscribe has found - count {}",notificationsToSubscribe.size() );
-// final Optional<DOMNotificationService> notifyService = getDOMMountPoint().getService(DOMNotificationService.class);
-// if(notifyService.isPresent()){
-// int registeredNotificationCount = 0;
-// for(SchemaPath schemaNotification : notificationsToSubscribe){
-// for(NotificationTopicRegistration reg : notificationTopicRegistrationList){
-// LOG.debug("Try notification registratio {} on SchemaPathNotification {}", reg.getSourceName(), schemaNotification.getLastComponent().getLocalName());
-// if(reg.checkNotificationPath(schemaNotification)){
-// LOG.info("Source of notification {} is activating, TopicId {}", reg.getSourceName(), topicId.getValue() );
-// boolean regSuccess = reg.registerNotificationTopic(schemaNotification, topicId);
-// if(regSuccess){
-// registeredNotificationCount = registeredNotificationCount +1;
-// }
-// }
-// }
-// }
-// if(registeredNotificationCount > 0){
-// joinTopicStatus = JoinTopicStatus.Up;
-// }
-// } else {
-// LOG.warn("NO DOMNotification service on node {}", this.nodeId);
-// }
-// } else {
-// LOG.debug("Notifications to subscribe has NOT found");
-// }
-//
-// final JoinTopicOutput output = new JoinTopicOutputBuilder().setStatus(joinTopicStatus).build();
-// return immediateFuture(RpcResultBuilder.success(output).build());
-//
-// }
-//
-// public void reActivateStreams(){
-// for (NotificationTopicRegistration reg : notificationTopicRegistrationList) {
-// LOG.info("Source of notification {} is reactivating on node {}", reg.getSourceName(), this.nodeId);
-// reg.reActivateNotificationSource();
-// }
-// }
-//
-// public void deActivateStreams(){
-// for (NotificationTopicRegistration reg : notificationTopicRegistrationList) {
-// LOG.info("Source of notification {} is deactivating on node {}", reg.getSourceName(), this.nodeId);
-// reg.deActivateNotificationSource();
-// }
-// }
-//
-// @Override
-// public void onNotification(final DOMNotification notification) {
-// SchemaPath notificationPath = notification.getType();
-// Date notificationEventTime = null;
-// if(notification instanceof DOMEvent){
-// notificationEventTime = ((DOMEvent) notification).getEventTime();
-// }
-// for(NotificationTopicRegistration notifReg : notificationTopicRegistrationList){
-// ArrayList<TopicId> topicIdsForNotification = notifReg.getNotificationTopicIds(notificationPath);
-// if(topicIdsForNotification != null && topicIdsForNotification.isEmpty() == false){
-//
-// if(notifReg instanceof StreamNotificationTopicRegistration){
-// StreamNotificationTopicRegistration streamReg = (StreamNotificationTopicRegistration)notifReg;
-// streamReg.setLastEventTime(notificationEventTime);
-// }
-//
-// for(TopicId topicId : topicIdsForNotification){
-// publishNotification(notification, topicId);
-// LOG.debug("Notification {} has been published for TopicId {}",notification.getType(), topicId.getValue());
-// }
-//
-// }
-// }
-// }
-//
-// private void publishNotification(final DOMNotification notification, TopicId topicId){
-// final ContainerNode topicNotification = Builders.containerBuilder()
-// .withNodeIdentifier(TOPIC_NOTIFICATION_ARG)
-// .withChild(ImmutableNodes.leafNode(TOPIC_ID_ARG, topicId))
-// .withChild(ImmutableNodes.leafNode(EVENT_SOURCE_ARG, this.nodeId))
-// .withChild(encapsulate(notification))
-// .build();
-// try {
-// domPublish.putNotification(new TopicDOMNotification(topicNotification));
-// } catch (final InterruptedException e) {
-// throw Throwables.propagate(e);
-// }
-// }
-//
-// private AnyXmlNode encapsulate(final DOMNotification body) {
-// // FIXME: Introduce something like AnyXmlWithNormalizedNodeData in Yangtools
-// final Document doc = XmlUtil.newDocument();
-// final Optional<String> namespace = Optional.of(PAYLOAD_ARG.getNodeType().getNamespace().toString());
-// final Element element = XmlUtil.createElement(doc , "payload", namespace);
-//
-// final DOMResult result = new DOMResult(element);
-//
-// final SchemaContext context = getDOMMountPoint().getSchemaContext();
-// final SchemaPath schemaPath = body.getType();
-// try {
-// NetconfMessageTransformUtil.writeNormalizedNode(body.getBody(), result, schemaPath, context);
-// return Builders.anyXmlBuilder().withNodeIdentifier(PAYLOAD_ARG)
-// .withValue(new DOMSource(element))
-// .build();
-// } catch (IOException | XMLStreamException e) {
-// LOG.error("Unable to encapsulate notification.",e);
-// throw Throwables.propagate(e);
-// }
-// }
-//
-// private List<SchemaPath> getMatchingNotifications(NotificationPattern notificationPattern){
-// // FIXME: default language should already be regex
-// final String regex = Util.wildcardToRegex(notificationPattern.getValue());
-//
-// final Pattern pattern = Pattern.compile(regex);
-// List<SchemaPath> availableNotifications = getAvailableNotifications();
-// if(availableNotifications == null || availableNotifications.isEmpty()){
-// return null;
-// }
-// return Util.expandQname(availableNotifications, pattern);
-// }
-//
-// @Override
-// public void close() throws Exception {
-// for(NotificationTopicRegistration streamReg : notificationTopicRegistrationList){
-// streamReg.close();
-// }
-// }
-//
-// @Override
-// public NodeKey getSourceNodeKey(){
-// return getNode().getKey();
-// }
-//
-// @Override
-// public List<SchemaPath> getAvailableNotifications() {
-//
-// final List<SchemaPath> availNotifList = new ArrayList<>();
-// // add Event Source Connection status notification
-// availNotifList.add(ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH);
-//
-// // FIXME: use SchemaContextListener to get changes asynchronously
-// final Set<NotificationDefinition> availableNotifications = getDOMMountPoint().getSchemaContext().getNotifications();
-// // add all known notifications from netconf device
-// for (final NotificationDefinition nd : availableNotifications) {
-// availNotifList.add(nd.getPath());
-// }
-// return availNotifList;
-// }
-//
-// public Node getNode() {
-// return node;
-// }
-//
-// DOMMountPoint getDOMMountPoint() {
-// return netconfMount;
-// }
-//
-// MountPoint getMountPoint() {
-// return mountPoint;
-// }
-//
-// NetconfNode getNetconfNode(){
-// return node.getAugmentation(NetconfNode.class);
-// }
-//
-//}
+++ /dev/null
-///*
-// * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
-// *
-// * This program and the accompanying materials are made available under the
-// * terms of the Eclipse Public License v1.0 which accompanies this distribution,
-// * and is available at http://www.eclipse.org/legal/epl-v10.html
-// */
-//
-//package org.opendaylight.controller.messagebus.eventsources.netconf;
-//
-//import java.util.HashMap;
-//import java.util.List;
-//import java.util.Map;
-//import java.util.concurrent.ConcurrentHashMap;
-//
-//import org.opendaylight.controller.config.yang.messagebus.app.impl.NamespaceToStream;
-//import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-//import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
-//import org.opendaylight.controller.md.sal.binding.api.MountPointService;
-//import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-//import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-//import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-//import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
-//import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
-//import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
-//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
-//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
-//import org.opendaylight.yangtools.concepts.ListenerRegistration;
-//import org.opendaylight.yangtools.yang.binding.DataObject;
-//import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//
-//import com.google.common.base.Preconditions;
-//
-//public final class NetconfEventSourceManager implements DataChangeListener, AutoCloseable {
-//
-// private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSourceManager.class);
-// private static final TopologyKey NETCONF_TOPOLOGY_KEY = new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName()));
-// private static final InstanceIdentifier<Node> NETCONF_DEVICE_PATH = InstanceIdentifier.create(NetworkTopology.class)
-// .child(Topology.class, NETCONF_TOPOLOGY_KEY)
-// .child(Node.class);
-//
-// private final Map<String, String> streamMap;
-// private final ConcurrentHashMap<InstanceIdentifier<?>, NetconfEventSourceRegistration> registrationMap = new ConcurrentHashMap<>();
-// private final DOMNotificationPublishService publishService;
-// private final DOMMountPointService domMounts;
-// private final MountPointService mountPointService;
-// private ListenerRegistration<DataChangeListener> listenerRegistration;
-// private final EventSourceRegistry eventSourceRegistry;
-//
-// public static NetconfEventSourceManager create(final DataBroker dataBroker,
-// final DOMNotificationPublishService domPublish,
-// final DOMMountPointService domMount,
-// final MountPointService bindingMount,
-// final EventSourceRegistry eventSourceRegistry,
-// final List<NamespaceToStream> namespaceMapping){
-//
-// final NetconfEventSourceManager eventSourceManager =
-// new NetconfEventSourceManager(domPublish, domMount,bindingMount, eventSourceRegistry, namespaceMapping);
-//
-// eventSourceManager.initialize(dataBroker);
-//
-// return eventSourceManager;
-//
-// }
-//
-// private NetconfEventSourceManager(final DOMNotificationPublishService domPublish,
-// final DOMMountPointService domMount,
-// final MountPointService bindingMount,
-// final EventSourceRegistry eventSourceRegistry,
-// final List<NamespaceToStream> namespaceMapping) {
-//
-// Preconditions.checkNotNull(domPublish);
-// Preconditions.checkNotNull(domMount);
-// Preconditions.checkNotNull(bindingMount);
-// Preconditions.checkNotNull(eventSourceRegistry);
-// Preconditions.checkNotNull(namespaceMapping);
-// this.streamMap = namespaceToStreamMapping(namespaceMapping);
-// this.domMounts = domMount;
-// this.mountPointService = bindingMount;
-// this.publishService = domPublish;
-// this.eventSourceRegistry = eventSourceRegistry;
-// }
-//
-// private void initialize(final DataBroker dataBroker){
-// Preconditions.checkNotNull(dataBroker);
-// listenerRegistration = dataBroker.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, NETCONF_DEVICE_PATH, this, DataChangeScope.SUBTREE);
-// LOG.info("NetconfEventSourceManager initialized.");
-// }
-//
-// private Map<String,String> namespaceToStreamMapping(final List<NamespaceToStream> namespaceMapping) {
-// final Map<String, String> streamMap = new HashMap<>(namespaceMapping.size());
-//
-// for (final NamespaceToStream nToS : namespaceMapping) {
-// streamMap.put(nToS.getUrnPrefix(), nToS.getStreamName());
-// }
-//
-// return streamMap;
-// }
-//
-// @Override
-// public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> event) {
-//
-// LOG.debug("[DataChangeEvent<InstanceIdentifier<?>, DataObject>: {}]", event);
-// for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getCreatedData().entrySet()) {
-// if (changeEntry.getValue() instanceof Node) {
-// nodeCreated(changeEntry.getKey(),(Node) changeEntry.getValue());
-// }
-// }
-//
-// for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getUpdatedData().entrySet()) {
-// if (changeEntry.getValue() instanceof Node) {
-// nodeUpdated(changeEntry.getKey(),(Node) changeEntry.getValue());
-// }
-// }
-//
-// for(InstanceIdentifier<?> removePath : event.getRemovedPaths()){
-// DataObject removeObject = event.getOriginalData().get(removePath);
-// if(removeObject instanceof Node){
-// nodeRemoved(removePath);
-// }
-// }
-//
-// }
-//
-// private void nodeCreated(final InstanceIdentifier<?> key, final Node node){
-// Preconditions.checkNotNull(key);
-// if(validateNode(node) == false){
-// LOG.warn("NodeCreated event : Node [{}] is null or not valid.", key.toString());
-// return;
-// }
-// LOG.info("Netconf event source [{}] is creating...", key.toString());
-// NetconfEventSourceRegistration nesr = NetconfEventSourceRegistration.create(key, node, this);
-// if(nesr != null){
-// NetconfEventSourceRegistration nesrOld = registrationMap.put(key, nesr);
-// if(nesrOld != null){
-// nesrOld.close();
-// }
-// }
-// }
-//
-// private void nodeUpdated(final InstanceIdentifier<?> key, final Node node){
-// Preconditions.checkNotNull(key);
-// if(validateNode(node) == false){
-// LOG.warn("NodeUpdated event : Node [{}] is null or not valid.", key.toString());
-// return;
-// }
-//
-// LOG.info("Netconf event source [{}] is updating...", key.toString());
-// NetconfEventSourceRegistration nesr = registrationMap.get(key);
-// if(nesr != null){
-// nesr.updateStatus();
-// } else {
-// nodeCreated(key, node);
-// }
-// }
-//
-// private void nodeRemoved(final InstanceIdentifier<?> key){
-// Preconditions.checkNotNull(key);
-// LOG.info("Netconf event source [{}] is removing...", key.toString());
-// NetconfEventSourceRegistration nesr = registrationMap.remove(key);
-// if(nesr != null){
-// nesr.close();
-// }
-// }
-//
-// private boolean validateNode(final Node node){
-// if(node == null){
-// return false;
-// }
-// return isNetconfNode(node);
-// }
-//
-// Map<String, String> getStreamMap() {
-// return streamMap;
-// }
-//
-// DOMNotificationPublishService getPublishService() {
-// return publishService;
-// }
-//
-// DOMMountPointService getDomMounts() {
-// return domMounts;
-// }
-//
-// EventSourceRegistry getEventSourceRegistry() {
-// return eventSourceRegistry;
-// }
-//
-// MountPointService getMountPointService() {
-// return mountPointService;
-// }
-//
-// private boolean isNetconfNode(final Node node) {
-// return node.getAugmentation(NetconfNode.class) != null ;
-// }
-//
-// @Override
-// public void close() {
-// listenerRegistration.close();
-// for(final NetconfEventSourceRegistration reg : registrationMap.values()){
-// reg.close();
-// }
-// registrationMap.clear();
-// }
-//
-//}
\ No newline at end of file
+++ /dev/null
-///*
-// * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
-// *
-// * This program and the accompanying materials are made available under the
-// * terms of the Eclipse Public License v1.0 which accompanies this distribution,
-// * and is available at http://www.eclipse.org/legal/epl-v10.html
-// */
-//package org.opendaylight.controller.messagebus.eventsources.netconf;
-//
-//import java.util.List;
-//
-//import org.opendaylight.controller.md.sal.binding.api.MountPoint;
-//import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
-//import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
-//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
-//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus;
-//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
-//import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-//import org.opendaylight.yangtools.yang.common.QName;
-//import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//
-//import com.google.common.base.Optional;
-//import com.google.common.base.Preconditions;
-//
-///**
-// * Helper class to keep connection status of netconf node and event source registration object
-// *
-// */
-//public class NetconfEventSourceRegistration implements AutoCloseable{
-//
-// private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSourceRegistration.class);
-// private static final YangInstanceIdentifier NETCONF_DEVICE_DOM_PATH = YangInstanceIdentifier.builder()
-// .node(NetworkTopology.QNAME)
-// .node(Topology.QNAME)
-// .nodeWithKey(Topology.QNAME, QName.create(Topology.QNAME, "topology-id"),TopologyNetconf.QNAME.getLocalName())
-// .node(Node.QNAME)
-// .build();
-// private static final QName NODE_ID_QNAME = QName.create(Node.QNAME,"node-id");
-// private static final String NotificationCapabilityPrefix = "(urn:ietf:params:xml:ns:netconf:notification";
-//
-// private final Node node;
-// private final InstanceIdentifier<?> instanceIdent;
-// private final NetconfEventSourceManager netconfEventSourceManager;
-// private ConnectionStatus currentNetconfConnStatus;
-// private EventSourceRegistration<NetconfEventSource> eventSourceRegistration;
-//
-// public static NetconfEventSourceRegistration create(final InstanceIdentifier<?> instanceIdent, final Node node,
-// final NetconfEventSourceManager netconfEventSourceManager){
-// Preconditions.checkNotNull(instanceIdent);
-// Preconditions.checkNotNull(node);
-// Preconditions.checkNotNull(netconfEventSourceManager);
-// if(isEventSource(node) == false){
-// return null;
-// }
-// NetconfEventSourceRegistration nesr = new NetconfEventSourceRegistration(instanceIdent, node, netconfEventSourceManager);
-// nesr.updateStatus();
-// LOG.debug("NetconfEventSourceRegistration for node {} has been initialized...",node.getNodeId().getValue());
-// return nesr;
-// }
-//
-// private static boolean isEventSource(final Node node) {
-// final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
-// if(netconfNode == null){
-// return false;
-// }
-// if (netconfNode.getAvailableCapabilities() == null) {
-// return false;
-// }
-// final List<String> capabilities = netconfNode.getAvailableCapabilities().getAvailableCapability();
-// if(capabilities == null || capabilities.isEmpty()) {
-// return false;
-// }
-// for (final String capability : netconfNode.getAvailableCapabilities().getAvailableCapability()) {
-// if(capability.startsWith(NotificationCapabilityPrefix)) {
-// return true;
-// }
-// }
-//
-// return false;
-// }
-//
-// private NetconfEventSourceRegistration(final InstanceIdentifier<?> instanceIdent, final Node node, final NetconfEventSourceManager netconfEventSourceManager) {
-// this.instanceIdent = instanceIdent;
-// this.node = node;
-// this.netconfEventSourceManager = netconfEventSourceManager;
-// this.eventSourceRegistration =null;
-// }
-//
-// public Node getNode() {
-// return node;
-// }
-//
-// Optional<EventSourceRegistration<NetconfEventSource>> getEventSourceRegistration() {
-// return Optional.fromNullable(eventSourceRegistration);
-// }
-//
-// NetconfNode getNetconfNode(){
-// return node.getAugmentation(NetconfNode.class);
-// }
-//
-// void updateStatus(){
-// ConnectionStatus netconfConnStatus = getNetconfNode().getConnectionStatus();
-// LOG.info("Change status on node {}, new status is {}",this.node.getNodeId().getValue(),netconfConnStatus);
-// if(netconfConnStatus.equals(currentNetconfConnStatus)){
-// return;
-// }
-// changeStatus(netconfConnStatus);
-// }
-//
-// private boolean checkConnectionStatusType(ConnectionStatus status){
-// if( status == ConnectionStatus.Connected
-// || status == ConnectionStatus.Connecting
-// || status == ConnectionStatus.UnableToConnect){
-// return true;
-// }
-// return false;
-// }
-//
-// private void changeStatus(ConnectionStatus newStatus){
-// Preconditions.checkNotNull(newStatus);
-// if(checkConnectionStatusType(newStatus) == false){
-// throw new IllegalStateException("Unknown new Netconf Connection Status");
-// }
-// if(this.currentNetconfConnStatus == null){
-// if (newStatus == ConnectionStatus.Connected){
-// registrationEventSource();
-// }
-// } else if (this.currentNetconfConnStatus == ConnectionStatus.Connecting){
-// if (newStatus == ConnectionStatus.Connected){
-// if(this.eventSourceRegistration == null){
-// registrationEventSource();
-// } else {
-// // reactivate stream on registered event source (invoke publish notification about connection)
-// this.eventSourceRegistration.getInstance().reActivateStreams();
-// }
-// }
-// } else if (this.currentNetconfConnStatus == ConnectionStatus.Connected) {
-//
-// if(newStatus == ConnectionStatus.Connecting || newStatus == ConnectionStatus.UnableToConnect){
-// // deactivate streams on registered event source (invoke publish notification about connection)
-// this.eventSourceRegistration.getInstance().deActivateStreams();
-// }
-// } else if (this.currentNetconfConnStatus == ConnectionStatus.UnableToConnect){
-// if(newStatus == ConnectionStatus.Connected){
-// if(this.eventSourceRegistration == null){
-// registrationEventSource();
-// } else {
-// // reactivate stream on registered event source (invoke publish notification about connection)
-// this.eventSourceRegistration.getInstance().reActivateStreams();
-// }
-// }
-// } else {
-// throw new IllegalStateException("Unknown current Netconf Connection Status");
-// }
-// this.currentNetconfConnStatus = newStatus;
-// }
-//
-// private void registrationEventSource(){
-// final Optional<MountPoint> mountPoint = netconfEventSourceManager.getMountPointService().getMountPoint(instanceIdent);
-// final Optional<DOMMountPoint> domMountPoint = netconfEventSourceManager.getDomMounts().getMountPoint(domMountPath(node.getNodeId()));
-// EventSourceRegistration<NetconfEventSource> registration = null;
-// if(domMountPoint.isPresent() && mountPoint.isPresent()) {
-// final NetconfEventSource netconfEventSource = new NetconfEventSource(
-// node,
-// netconfEventSourceManager.getStreamMap(),
-// domMountPoint.get(),
-// mountPoint.get(),
-// netconfEventSourceManager.getPublishService());
-// registration = netconfEventSourceManager.getEventSourceRegistry().registerEventSource(netconfEventSource);
-// LOG.info("Event source {} has been registered",node.getNodeId().getValue());
-// }
-// this.eventSourceRegistration = registration;
-// }
-//
-// private YangInstanceIdentifier domMountPath(final NodeId nodeId) {
-// return YangInstanceIdentifier.builder(NETCONF_DEVICE_DOM_PATH).nodeWithKey(Node.QNAME, NODE_ID_QNAME, nodeId.getValue()).build();
-// }
-//
-// private void closeEventSourceRegistration(){
-// if(getEventSourceRegistration().isPresent()){
-// getEventSourceRegistration().get().close();
-// }
-// }
-//
-// @Override
-// public void close() {
-// closeEventSourceRegistration();
-// }
-//
-//}
+++ /dev/null
-///*
-// * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
-// *
-// * This program and the accompanying materials are made available under the
-// * terms of the Eclipse Public License v1.0 which accompanies this distribution,
-// * and is available at http://www.eclipse.org/legal/epl-v10.html
-// */
-//package org.opendaylight.controller.messagebus.eventsources.netconf;
-//
-//import java.util.ArrayList;
-//import java.util.Date;
-//import java.util.List;
-//import java.util.concurrent.ConcurrentHashMap;
-//
-//import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
-//import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
-//import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
-//import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
-//import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
-//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
-//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
-//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
-//import org.opendaylight.yangtools.concepts.ListenerRegistration;
-//import org.opendaylight.yangtools.yang.common.QName;
-//import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
-//import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-//import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
-//import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-//import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeAttrBuilder;
-//import org.opendaylight.yangtools.yang.model.api.SchemaPath;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//
-//import com.google.common.base.Optional;
-//import com.google.common.util.concurrent.CheckedFuture;
-//
-//public class StreamNotificationTopicRegistration extends NotificationTopicRegistration {
-//
-// private static final Logger LOG = LoggerFactory.getLogger(StreamNotificationTopicRegistration.class);
-// private static final NodeIdentifier STREAM_QNAME = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"stream"));
-// private static final SchemaPath CREATE_SUBSCRIPTION = SchemaPath.create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription"));
-// private static final NodeIdentifier START_TIME_SUBSCRIPTION = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"startTime"));
-//
-// final private DOMMountPoint domMountPoint;
-// final private String nodeId;
-// final private NetconfEventSource netconfEventSource;
-// final private Stream stream;
-// private Date lastEventTime;
-//
-// private ConcurrentHashMap<SchemaPath, ListenerRegistration<NetconfEventSource>> notificationRegistrationMap = new ConcurrentHashMap<>();
-// private ConcurrentHashMap<SchemaPath, ArrayList<TopicId>> notificationTopicMap = new ConcurrentHashMap<>();
-//
-// public StreamNotificationTopicRegistration(final Stream stream, final String notificationPrefix, NetconfEventSource netconfEventSource) {
-// super(NotificationSourceType.NetconfDeviceStream, stream.getName().getValue(), notificationPrefix);
-// this.domMountPoint = netconfEventSource.getDOMMountPoint();
-// this.nodeId = netconfEventSource.getNode().getNodeId().getValue().toString();
-// this.netconfEventSource = netconfEventSource;
-// this.stream = stream;
-// this.lastEventTime= null;
-// setReplaySupported(this.stream.isReplaySupport());
-// setActive(false);
-// LOG.info("StreamNotificationTopicRegistration initialized for {}", getStreamName());
-// }
-//
-// void activateNotificationSource() {
-// if(isActive() == false){
-// LOG.info("Stream {} is not active on node {}. Will subscribe.", this.getStreamName(), this.nodeId);
-// final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME))
-// .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName()))
-// .build();
-// CheckedFuture<DOMRpcResult, DOMRpcException> csFuture = domMountPoint.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input);
-// try {
-// csFuture.checkedGet();
-// setActive(true);
-// } catch (DOMRpcException e) {
-// LOG.warn("Can not subscribe stream {} on node {}", this.getSourceName(), this.nodeId);
-// setActive(false);
-// return;
-// }
-// } else {
-// LOG.info("Stream {} is now active on node {}", this.getStreamName(), this.nodeId);
-// }
-// }
-//
-// void reActivateNotificationSource(){
-// if(isActive()){
-// LOG.info("Stream {} is reactivating on node {}.", this.getStreamName(), this.nodeId);
-// DataContainerNodeAttrBuilder<NodeIdentifier, ContainerNode> inputBuilder =
-// Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME))
-// .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName()));
-// if(isReplaySupported() && this.getLastEventTime() != null){
-// inputBuilder.withChild(ImmutableNodes.leafNode(START_TIME_SUBSCRIPTION, this.getLastEventTime()));
-// }
-// final ContainerNode input = inputBuilder.build();
-// CheckedFuture<DOMRpcResult, DOMRpcException> csFuture = domMountPoint.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input);
-// try {
-// csFuture.checkedGet();
-// setActive(true);
-// } catch (DOMRpcException e) {
-// LOG.warn("Can not resubscribe stream {} on node {}", this.getSourceName(), this.nodeId);
-// setActive(false);
-// return;
-// }
-// }
-// }
-//
-// @Override
-// void deActivateNotificationSource() {
-// // no operations need
-// }
-//
-// private void closeStream() {
-// if(isActive()){
-// for(ListenerRegistration<NetconfEventSource> reg : notificationRegistrationMap.values()){
-// reg.close();
-// }
-// notificationRegistrationMap.clear();
-// notificationTopicMap.clear();
-// setActive(false);
-// }
-// }
-//
-// private String getStreamName() {
-// return getSourceName();
-// }
-//
-// @Override
-// ArrayList<TopicId> getNotificationTopicIds(SchemaPath notificationPath){
-// return notificationTopicMap.get(notificationPath);
-// }
-//
-// @Override
-// boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId){
-//
-// if(checkNotificationPath(notificationPath) == false){
-// LOG.debug("Bad SchemaPath for notification try to register");
-// return false;
-// }
-//
-// final Optional<DOMNotificationService> notifyService = domMountPoint.getService(DOMNotificationService.class);
-// if(notifyService.isPresent() == false){
-// LOG.debug("DOMNotificationService is not present");
-// return false;
-// }
-//
-// activateNotificationSource();
-// if(isActive() == false){
-// LOG.warn("Stream {} is not active, listener for notification {} is not registered.", getStreamName(), notificationPath.toString());
-// return false;
-// }
-//
-// ListenerRegistration<NetconfEventSource> registration =
-// notifyService.get().registerNotificationListener(this.netconfEventSource,notificationPath);
-// notificationRegistrationMap.put(notificationPath, registration);
-// ArrayList<TopicId> topicIds = getNotificationTopicIds(notificationPath);
-// if(topicIds == null){
-// topicIds = new ArrayList<>();
-// topicIds.add(topicId);
-// } else {
-// if(topicIds.contains(topicId) == false){
-// topicIds.add(topicId);
-// }
-// }
-//
-// notificationTopicMap.put(notificationPath, topicIds);
-// return true;
-// }
-//
-// @Override
-// synchronized void unRegisterNotificationTopic(TopicId topicId) {
-// List<SchemaPath> notificationPathToRemove = new ArrayList<>();
-// for(SchemaPath notifKey : notificationTopicMap.keySet()){
-// ArrayList<TopicId> topicList = notificationTopicMap.get(notifKey);
-// if(topicList != null){
-// topicList.remove(topicId);
-// if(topicList.isEmpty()){
-// notificationPathToRemove.add(notifKey);
-// }
-// }
-// }
-// for(SchemaPath notifKey : notificationPathToRemove){
-// notificationTopicMap.remove(notifKey);
-// ListenerRegistration<NetconfEventSource> reg = notificationRegistrationMap.remove(notifKey);
-// if(reg != null){
-// reg.close();
-// }
-// }
-// }
-//
-// Optional<Date> getLastEventTime() {
-// return Optional.fromNullable(lastEventTime);
-// }
-//
-//
-// void setLastEventTime(Date lastEventTime) {
-// this.lastEventTime = lastEventTime;
-// }
-//
-// @Override
-// public void close() throws Exception {
-// closeStream();
-// }
-//
-//}
description
"Service definition for Message Bus application implementation.";
-
+
revision "2015-02-03" {
description "Second revision. Message Bus opensourcing";
}
augment "/config:modules/config:module/config:configuration" {
case messagebus-app-impl {
when "/config:modules/config:module/config:type = 'messagebus-app-impl'";
-
+
container binding-broker {
uses config:service-ref {
refine type {
}
}
- container dom-broker {
- uses config:service-ref {
- refine type {
- mandatory true;
- config:required-identity dom:dom-broker-osgi-registry;
- }
- }
- }
-
- list namespace-to-stream {
- key urn-prefix;
-
- leaf urn-prefix {
- type string;
- }
-
- leaf stream-name {
- type string;
- }
- }
- }
- }
-
- augment "/config:modules/config:module/config:state" {
- case messagebus-app-impl {
- when "/config:modules/config:module/config:type = 'messagebus-app-impl'";
}
}
+
}
\ No newline at end of file
+++ /dev/null
-///*
-// * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
-// *
-// * This program and the accompanying materials are made available under the
-// * terms of the Eclipse Public License v1.0 which accompanies this distribution,
-// * and is available at http://www.eclipse.org/legal/epl-v10.html
-// */
-//package org.opendaylight.controller.messagebus.eventsources.netconf;
-//
-//import static org.mockito.Matchers.any;
-//import static org.mockito.Matchers.eq;
-//import static org.mockito.Matchers.notNull;
-//import static org.mockito.Mockito.doReturn;
-//import static org.mockito.Mockito.mock;
-//import static org.mockito.Mockito.times;
-//import static org.mockito.Mockito.verify;
-//
-//import java.util.ArrayList;
-//import java.util.HashMap;
-//import java.util.List;
-//import java.util.Map;
-//
-//import org.junit.Before;
-//import org.junit.BeforeClass;
-//import org.junit.Test;
-//import org.opendaylight.controller.config.yang.messagebus.app.impl.NamespaceToStream;
-//import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-//import org.opendaylight.controller.md.sal.binding.api.MountPoint;
-//import org.opendaylight.controller.md.sal.binding.api.MountPointService;
-//import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-//import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-//import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-//import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-//import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-//import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
-//import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
-//import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
-//import org.opendaylight.controller.messagebus.app.impl.EventSourceTopology;
-//import org.opendaylight.controller.messagebus.spi.EventSource;
-//import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
-//import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
-//import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
-//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf;
-//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
-//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
-//import org.opendaylight.yangtools.concepts.ListenerRegistration;
-//import org.opendaylight.yangtools.yang.binding.DataObject;
-//import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-//import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-//
-//import com.google.common.base.Optional;
-//import com.google.common.util.concurrent.CheckedFuture;
-//
-//public class NetconfEventSourceManagerTest {
-//
-// NetconfEventSourceManager netconfEventSourceManager;
-// ListenerRegistration listenerRegistrationMock;
-// DOMMountPointService domMountPointServiceMock;
-// MountPointService mountPointServiceMock;
-// EventSourceTopology eventSourceTopologyMock;
-// AsyncDataChangeEvent asyncDataChangeEventMock;
-// RpcProviderRegistry rpcProviderRegistryMock;
-// EventSourceRegistry eventSourceRegistry;
-// @BeforeClass
-// public static void initTestClass() throws IllegalAccessException, InstantiationException {
-// }
-//
-// @Before
-// public void setUp() throws Exception {
-// DataBroker dataBrokerMock = mock(DataBroker.class);
-// DOMNotificationPublishService domNotificationPublishServiceMock = mock(DOMNotificationPublishService.class);
-// domMountPointServiceMock = mock(DOMMountPointService.class);
-// mountPointServiceMock = mock(MountPointService.class);
-// eventSourceTopologyMock = mock(EventSourceTopology.class);
-// rpcProviderRegistryMock = mock(RpcProviderRegistry.class);
-// eventSourceRegistry = mock(EventSourceRegistry.class);
-// List<NamespaceToStream> namespaceToStreamList = new ArrayList<>();
-//
-// listenerRegistrationMock = mock(ListenerRegistration.class);
-// doReturn(listenerRegistrationMock).when(dataBrokerMock).registerDataChangeListener(eq(LogicalDatastoreType.OPERATIONAL), any(InstanceIdentifier.class), any(NetconfEventSourceManager.class), eq(AsyncDataBroker.DataChangeScope.SUBTREE));
-//
-// Optional<DOMMountPoint> optionalDomMountServiceMock = (Optional<DOMMountPoint>) mock(Optional.class);
-// doReturn(true).when(optionalDomMountServiceMock).isPresent();
-// doReturn(optionalDomMountServiceMock).when(domMountPointServiceMock).getMountPoint((YangInstanceIdentifier)notNull());
-//
-// DOMMountPoint domMountPointMock = mock(DOMMountPoint.class);
-// doReturn(domMountPointMock).when(optionalDomMountServiceMock).get();
-//
-//
-// Optional optionalBindingMountMock = mock(Optional.class);
-// doReturn(true).when(optionalBindingMountMock).isPresent();
-//
-// MountPoint mountPointMock = mock(MountPoint.class);
-// doReturn(optionalBindingMountMock).when(mountPointServiceMock).getMountPoint(any(InstanceIdentifier.class));
-// doReturn(mountPointMock).when(optionalBindingMountMock).get();
-//
-// Optional optionalMpDataBroker = mock(Optional.class);
-// DataBroker mpDataBroker = mock(DataBroker.class);
-// doReturn(optionalMpDataBroker).when(mountPointMock).getService(DataBroker.class);
-// doReturn(true).when(optionalMpDataBroker).isPresent();
-// doReturn(mpDataBroker).when(optionalMpDataBroker).get();
-//
-// ReadOnlyTransaction rtx = mock(ReadOnlyTransaction.class);
-// doReturn(rtx).when(mpDataBroker).newReadOnlyTransaction();
-// CheckedFuture<Optional<Streams>, ReadFailedException> checkFeature = (CheckedFuture<Optional<Streams>, ReadFailedException>)mock(CheckedFuture.class);
-// InstanceIdentifier<Streams> pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build();
-// doReturn(checkFeature).when(rtx).read(LogicalDatastoreType.OPERATIONAL, pathStream);
-// Optional<Streams> avStreams = NetconfTestUtils.getAvailableStream("stream01", true);
-// doReturn(avStreams).when(checkFeature).checkedGet();
-//
-// EventSourceRegistration esrMock = mock(EventSourceRegistration.class);
-//
-// netconfEventSourceManager =
-// NetconfEventSourceManager.create(dataBrokerMock,
-// domNotificationPublishServiceMock,
-// domMountPointServiceMock,
-// mountPointServiceMock,
-// eventSourceRegistry,
-// namespaceToStreamList);
-// }
-//
-// @Test
-// public void onDataChangedCreateEventSourceTestByCreateEntry() throws Exception {
-// onDataChangedTestHelper(true,false,true,NetconfTestUtils.notification_capability_prefix);
-// netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
-// verify(eventSourceRegistry, times(1)).registerEventSource(any(EventSource.class));
-// }
-//
-// @Test
-// public void onDataChangedCreateEventSourceTestByUpdateEntry() throws Exception {
-// onDataChangedTestHelper(false,true,true, NetconfTestUtils.notification_capability_prefix);
-// netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
-// verify(eventSourceRegistry, times(1)).registerEventSource(any(EventSource.class));
-// }
-//
-// @Test
-// public void onDataChangedCreateEventSourceTestNotNeconf() throws Exception {
-// onDataChangedTestHelper(false,true,false,NetconfTestUtils.notification_capability_prefix);
-// netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
-// verify(eventSourceRegistry, times(0)).registerEventSource(any(EventSource.class));
-// }
-//
-// @Test
-// public void onDataChangedCreateEventSourceTestNotNotificationCapability() throws Exception {
-// onDataChangedTestHelper(true,false,true,"bad-prefix");
-// netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
-// verify(eventSourceRegistry, times(0)).registerEventSource(any(EventSource.class));
-// }
-//
-// private void onDataChangedTestHelper(boolean create, boolean update, boolean isNetconf, String notificationCapabilityPrefix) throws Exception{
-// asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class);
-// Map<InstanceIdentifier, DataObject> mapCreate = new HashMap<>();
-// Map<InstanceIdentifier, DataObject> mapUpdate = new HashMap<>();
-//
-// Node node01;
-// String nodeId = "Node01";
-// doReturn(mapCreate).when(asyncDataChangeEventMock).getCreatedData();
-// doReturn(mapUpdate).when(asyncDataChangeEventMock).getUpdatedData();
-//
-// if(isNetconf){
-// node01 = NetconfTestUtils.getNetconfNode(nodeId, "node01.test.local", ConnectionStatus.Connected, notificationCapabilityPrefix);
-//
-// } else {
-// node01 = NetconfTestUtils.getNode(nodeId);
-// }
-//
-// if(create){
-// mapCreate.put(NetconfTestUtils.getInstanceIdentifier(node01), node01);
-// }
-// if(update){
-// mapUpdate.put(NetconfTestUtils.getInstanceIdentifier(node01), node01);
-// }
-//
-// }
-//
-//}
\ No newline at end of file
+++ /dev/null
-///*
-// * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
-// *
-// * This program and the accompanying materials are made available under the
-// * terms of the Eclipse Public License v1.0 which accompanies this distribution,
-// * and is available at http://www.eclipse.org/legal/epl-v10.html
-// */
-//package org.opendaylight.controller.messagebus.eventsources.netconf;
-//
-//
-//import static org.junit.Assert.assertNotNull;
-//import static org.mockito.Matchers.any;
-//import static org.mockito.Mockito.doReturn;
-//import static org.mockito.Mockito.mock;
-//
-//import java.net.URI;
-//import java.util.HashMap;
-//import java.util.HashSet;
-//import java.util.Map;
-//import java.util.Set;
-//
-//import org.junit.Before;
-//import org.junit.Test;
-//import org.opendaylight.controller.md.sal.binding.api.BindingService;
-//import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-//import org.opendaylight.controller.md.sal.binding.api.MountPoint;
-//import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-//import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-//import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-//import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
-//import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
-//import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
-//import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
-//import org.opendaylight.controller.md.sal.dom.api.DOMService;
-//import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
-//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
-//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
-//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
-//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
-//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf;
-//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
-//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus;
-//import org.opendaylight.yangtools.concepts.ListenerRegistration;
-//import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-//import org.opendaylight.yangtools.yang.common.QName;
-//import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-//import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
-//import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-//import org.opendaylight.yangtools.yang.model.api.SchemaPath;
-//
-//import com.google.common.base.Optional;
-//import com.google.common.util.concurrent.CheckedFuture;
-//
-//public class NetconfEventSourceTest {
-//
-// NetconfEventSource netconfEventSource;
-// DOMMountPoint domMountPointMock;
-// MountPoint mountPointMock;
-// JoinTopicInput joinTopicInputMock;
-//
-// @Before
-// public void setUp() throws Exception {
-// Map<String, String> streamMap = new HashMap<>();
-// streamMap.put("uriStr1", "string2");
-// domMountPointMock = mock(DOMMountPoint.class);
-// mountPointMock = mock(MountPoint.class);
-// DOMNotificationPublishService domNotificationPublishServiceMock = mock(DOMNotificationPublishService.class);
-// RpcConsumerRegistry rpcConsumerRegistryMock = mock(RpcConsumerRegistry.class);
-// Optional<BindingService> onlyOptionalMock = (Optional<BindingService>) mock(Optional.class);
-// NotificationsService notificationsServiceMock = mock(NotificationsService.class);
-// doReturn(notificationsServiceMock).when(rpcConsumerRegistryMock).getRpcService(NotificationsService.class);
-//
-// Optional<DataBroker> optionalMpDataBroker = (Optional<DataBroker>) mock(Optional.class);
-// DataBroker mpDataBroker = mock(DataBroker.class);
-// doReturn(optionalMpDataBroker).when(mountPointMock).getService(DataBroker.class);
-// doReturn(true).when(optionalMpDataBroker).isPresent();
-// doReturn(mpDataBroker).when(optionalMpDataBroker).get();
-//
-// ReadOnlyTransaction rtx = mock(ReadOnlyTransaction.class);
-// doReturn(rtx).when(mpDataBroker).newReadOnlyTransaction();
-// CheckedFuture<Optional<Streams>, ReadFailedException> checkFeature = (CheckedFuture<Optional<Streams>, ReadFailedException>)mock(CheckedFuture.class);
-// InstanceIdentifier<Streams> pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build();
-// doReturn(checkFeature).when(rtx).read(LogicalDatastoreType.OPERATIONAL, pathStream);
-// Optional<Streams> avStreams = NetconfTestUtils.getAvailableStream("stream01", true);
-// doReturn(avStreams).when(checkFeature).checkedGet();
-//
-// netconfEventSource = new NetconfEventSource(
-// NetconfTestUtils.getNetconfNode("NodeId1", "node.test.local", ConnectionStatus.Connected, NetconfTestUtils.notification_capability_prefix),
-// streamMap,
-// domMountPointMock,
-// mountPointMock ,
-// domNotificationPublishServiceMock);
-//
-// }
-//
-// @Test
-// public void joinTopicTest() throws Exception{
-// joinTopicTestHelper();
-// assertNotNull("JoinTopic return value has not been created correctly.", netconfEventSource.joinTopic(joinTopicInputMock));
-// }
-//
-// private void joinTopicTestHelper() throws Exception{
-// joinTopicInputMock = mock(JoinTopicInput.class);
-// TopicId topicId = new TopicId("topicID007");
-// doReturn(topicId).when(joinTopicInputMock).getTopicId();
-// NotificationPattern notificationPatternMock = mock(NotificationPattern.class);
-// doReturn(notificationPatternMock).when(joinTopicInputMock).getNotificationPattern();
-// doReturn("uriStr1").when(notificationPatternMock).getValue();
-//
-// SchemaContext schemaContextMock = mock(SchemaContext.class);
-// doReturn(schemaContextMock).when(domMountPointMock).getSchemaContext();
-// Set<NotificationDefinition> notificationDefinitionSet = new HashSet<>();
-// NotificationDefinition notificationDefinitionMock = mock(NotificationDefinition.class);
-// notificationDefinitionSet.add(notificationDefinitionMock);
-//
-// URI uri = new URI("uriStr1");
-// QName qName = new QName(uri, "localName1");
-// org.opendaylight.yangtools.yang.model.api.SchemaPath schemaPath = SchemaPath.create(true, qName);
-// doReturn(notificationDefinitionSet).when(schemaContextMock).getNotifications();
-// doReturn(schemaPath).when(notificationDefinitionMock).getPath();
-//
-// Optional<DOMNotificationService> domNotificationServiceOptionalMock = (Optional<DOMNotificationService>) mock(Optional.class);
-// doReturn(domNotificationServiceOptionalMock).when(domMountPointMock).getService(DOMNotificationService.class);
-// doReturn(true).when(domNotificationServiceOptionalMock).isPresent();
-//
-// DOMNotificationService domNotificationServiceMock = mock(DOMNotificationService.class);
-// doReturn(domNotificationServiceMock).when(domNotificationServiceOptionalMock).get();
-// ListenerRegistration<NetconfEventSource> listenerRegistrationMock = (ListenerRegistration<NetconfEventSource>)mock(ListenerRegistration.class);
-// doReturn(listenerRegistrationMock).when(domNotificationServiceMock).registerNotificationListener(any(NetconfEventSource.class), any(SchemaPath.class));
-//
-// Optional<DOMService> optionalMock = (Optional<DOMService>) mock(Optional.class);
-// doReturn(optionalMock).when(domMountPointMock).getService(DOMRpcService.class);
-// doReturn(true).when(optionalMock).isPresent();
-// DOMRpcService domRpcServiceMock = mock(DOMRpcService.class);
-// doReturn(domRpcServiceMock).when(optionalMock).get();
-// CheckedFuture checkedFutureMock = mock(CheckedFuture.class);
-// doReturn(checkedFutureMock).when(domRpcServiceMock).invokeRpc(any(SchemaPath.class), any(ContainerNode.class));
-//
-// }
-//
-//}
\ No newline at end of file
+++ /dev/null
-///*
-// * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
-// *
-// * This program and the accompanying materials are made available under the
-// * terms of the Eclipse Public License v1.0 which accompanies this distribution,
-// * and is available at http://www.eclipse.org/legal/epl-v10.html
-// */
-//package org.opendaylight.controller.messagebus.eventsources.netconf;
-//
-//import java.util.ArrayList;
-//import java.util.List;
-//
-//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
-//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
-//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.StreamsBuilder;
-//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
-//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamBuilder;
-//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.DomainName;
-//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host;
-//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
-//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
-//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus;
-//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.AvailableCapabilities;
-//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.AvailableCapabilitiesBuilder;
-//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
-//import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-//
-//import com.google.common.base.Optional;
-//
-//public final class NetconfTestUtils {
-//
-// public static final String notification_capability_prefix = "(urn:ietf:params:xml:ns:netconf:notification";
-//
-// private NetconfTestUtils() {
-// }
-//
-// public static Node getNetconfNode(String nodeIdent,String hostName,ConnectionStatus cs, String notificationCapabilityPrefix){
-//
-// DomainName dn = new DomainName(hostName);
-// Host host = new Host(dn);
-//
-// List<String> avCapList = new ArrayList<>();
-// avCapList.add(notificationCapabilityPrefix +"_availableCapabilityString1");
-// AvailableCapabilities avCaps = new AvailableCapabilitiesBuilder().setAvailableCapability(avCapList).build();
-// NetconfNode nn = new NetconfNodeBuilder()
-// .setConnectionStatus(cs)
-// .setHost(host)
-// .setAvailableCapabilities(avCaps)
-// .build();
-//
-// NodeId nodeId = new NodeId(nodeIdent);
-// NodeKey nk = new NodeKey(nodeId);
-// NodeBuilder nb = new NodeBuilder();
-// nb.setKey(nk);
-//
-// nb.addAugmentation(NetconfNode.class, nn);
-// return nb.build();
-// }
-//
-// public static Node getNode(String nodeIdent){
-// NodeId nodeId = new NodeId(nodeIdent);
-// NodeKey nk = new NodeKey(nodeId);
-// NodeBuilder nb = new NodeBuilder();
-// nb.setKey(nk);
-// return nb.build();
-// }
-//
-// public static InstanceIdentifier<Node> getInstanceIdentifier(Node node){
-// TopologyKey NETCONF_TOPOLOGY_KEY = new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName()));
-// InstanceIdentifier<Node> nodeII = InstanceIdentifier.create(NetworkTopology.class)
-// .child(Topology.class, NETCONF_TOPOLOGY_KEY)
-// .child(Node.class, node.getKey());
-// return nodeII;
-// }
-//
-// public static Optional<Streams> getAvailableStream(String Name, boolean replaySupport){
-// Stream stream = new StreamBuilder()
-// .setName(new StreamNameType(Name))
-// .setReplaySupport(replaySupport)
-// .build();
-// List<Stream> streamList = new ArrayList<>();
-// streamList.add(stream);
-// Streams streams = new StreamsBuilder().setStream(streamList).build();
-// return Optional.of(streams);
-// }
-//
-//}
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>\r
+<!--\r
+Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.\r
+\r
+This program and the accompanying materials are made available under the\r
+terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
+and is available at http://www.eclipse.org/legal/epl-v10.html\r
+-->\r
+<project xmlns="http://maven.apache.org/POM/4.0.0"\r
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"\r
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">\r
+ <modelVersion>4.0.0</modelVersion>\r
+\r
+ <parent>\r
+ <groupId>org.opendaylight.controller</groupId>\r
+ <artifactId>sal-parent</artifactId>\r
+ <version>1.3.0-SNAPSHOT</version>\r
+ </parent>\r
+\r
+ <artifactId>messagebus-util</artifactId>\r
+ <name>${project.artifactId}</name>\r
+\r
+ <packaging>bundle</packaging>\r
+\r
+ <dependencies>\r
+ <dependency>\r
+ <groupId>org.opendaylight.controller</groupId>\r
+ <artifactId>sal-core-api</artifactId>\r
+ </dependency>\r
+ <dependency>\r
+ <groupId>org.opendaylight.controller</groupId>\r
+ <artifactId>sal-binding-api</artifactId>\r
+ </dependency>\r
+ <dependency>\r
+ <groupId>org.opendaylight.controller</groupId>\r
+ <artifactId>messagebus-api</artifactId>\r
+ </dependency>\r
+ <!-- Testing Dependencies -->\r
+ <dependency>\r
+ <groupId>junit</groupId>\r
+ <artifactId>junit</artifactId>\r
+ <scope>test</scope>\r
+ </dependency>\r
+ <dependency>\r
+ <groupId>org.glassfish.jersey.test-framework.providers</groupId>\r
+ <artifactId>jersey-test-framework-provider-grizzly2</artifactId>\r
+ <scope>test</scope>\r
+ </dependency>\r
+ <dependency>\r
+ <groupId>org.mockito</groupId>\r
+ <artifactId>mockito-all</artifactId>\r
+ <scope>test</scope>\r
+ </dependency>\r
+ </dependencies>\r
+</project>\r
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.config.yang.messagebus.app.impl;
+package org.opendaylight.controller.messagebus.app.util;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.messagebus.app.impl;
+package org.opendaylight.controller.messagebus.app.util;
import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicNotification;
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.messagebus.app.impl;
+package org.opendaylight.controller.messagebus.app.util;
import java.util.ArrayList;
import java.util.List;
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.messagebus.app.impl;
+package org.opendaylight.controller.messagebus.app.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.messagebus.app.impl;
+package org.opendaylight.controller.messagebus.app.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
<module>messagebus-api</module>
<module>messagebus-spi</module>
<module>messagebus-impl</module>
+ <module>messagebus-util</module>
<module>messagebus-config</module>
<!-- PAX EXAM ITs -->
<feature version='${project.version}'>odl-netconf-connector-ssh</feature>
</feature>
+ <feature name='odl-message-bus' version='${project.version}'>
+ <!-- messagebus endpoint for netconf connector-->
+ <feature version='${project.version}'>odl-netconf-connector-all</feature>
+ <feature version='${mdsal.version}'>odl-message-bus-collector</feature>
+ <bundle>mvn:org.opendaylight.controller/messagebus-netconf/${netconf.version}</bundle>
+ <configfile finalname="${config.configfile.directory}/06-message-netconf.xml">mvn:org.opendaylight.controller/messagebus-netconf/${netconf.version}/xml/config</configfile>
+ </feature>
+
<feature name='odl-netconf-connector' version='${project.version}' description="OpenDaylight :: Netconf Connector :: Netconf Connector">
<feature version='${mdsal.version}'>odl-mdsal-broker</feature>
<feature version='${netconf.version}'>odl-netconf-client</feature>
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+
+This program and the accompanying materials are made available under the
+terms of the Eclipse Public License v1.0 which accompanies this distribution,
+and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>netconf-subsystem</artifactId>
+ <version>0.4.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>messagebus-netconf</artifactId>
+ <name>${project.artifactId}</name>
+
+ <packaging>bundle</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>ietf-netconf-notifications</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-binding-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-core-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-common-util</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-data-impl</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>config-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>messagebus-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>messagebus-spi</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>messagebus-util</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-netconf-connector</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-binding-config</artifactId>
+ </dependency>
+
+ <!-- Testing Dependencies -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.test-framework.providers</groupId>
+ <artifactId>jersey-test-framework-provider-grizzly2</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>${project.build.directory}/generated-sources/config</source>
+ </sources>
+ </configuration>
+ </execution>
+ <execution>
+ <id>attach-artifacts</id>
+ <goals>
+ <goal>attach-artifact</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <artifacts>
+ <artifact>
+ <file>${project.build.directory}/classes/initial/06-message-netconf.xml</file>
+ <type>xml</type>
+ <classifier>config</classifier>
+ </artifact>
+ </artifacts>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
--- /dev/null
+package org.opendaylight.controller.config.yang.messagebus.netconf;
+
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.MountPointService;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.messagebus.app.util.Providers;
+import org.opendaylight.controller.messagebus.eventsources.netconf.NetconfEventSourceManager;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.core.api.Broker;
+
+public class MessageBusNetconfModule extends org.opendaylight.controller.config.yang.messagebus.netconf.AbstractMessageBusNetconfModule {
+ public MessageBusNetconfModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
+ super(identifier, dependencyResolver);
+ }
+
+ public MessageBusNetconfModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, org.opendaylight.controller.config.yang.messagebus.netconf.MessageBusNetconfModule oldModule, java.lang.AutoCloseable oldInstance) {
+ super(identifier, dependencyResolver, oldModule, oldInstance);
+ }
+
+ @Override
+ public void customValidation() {}
+
+ @Override
+ public java.lang.AutoCloseable createInstance() {
+ final BindingAwareBroker.ProviderContext bindingCtx = getBindingBrokerDependency().registerProvider(new Providers.BindingAware());
+ final Broker.ProviderSession domCtx = getDomBrokerDependency().registerProvider(new Providers.BindingIndependent());
+
+ final MountPointService mountPointService = bindingCtx.getSALService(MountPointService.class);
+ final DataBroker dataBroker = bindingCtx.getSALService(DataBroker.class);
+
+ final DOMNotificationPublishService domPublish = domCtx.getService(DOMNotificationPublishService.class);
+ final DOMMountPointService domMount = domCtx.getService(DOMMountPointService.class);
+
+ return NetconfEventSourceManager.create(dataBroker, domPublish, domMount,
+ mountPointService, getEventSourceRegistryDependency(), getNamespaceToStream());
+ }
+
+}
--- /dev/null
+/*
+* Generated file
+*
+* Generated from: yang module name: messagebus-netconf yang module local name: messagebus-netconf
+* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator
+* Generated at: Wed Jul 29 14:15:30 CEST 2015
+*
+* Do not modify this file unless it is present under src/main directory
+*/
+package org.opendaylight.controller.config.yang.messagebus.netconf;
+public class MessageBusNetconfModuleFactory extends org.opendaylight.controller.config.yang.messagebus.netconf.AbstractMessageBusNetconfModuleFactory {
+
+}
*/
package org.opendaylight.controller.messagebus.eventsources.netconf;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
-
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.transform.dom.DOMSource;
-
import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-
public class ConnectionNotificationTopicRegistration extends NotificationTopicRegistration {
private static final Logger LOG = LoggerFactory.getLogger(ConnectionNotificationTopicRegistration.class);
- public static final SchemaPath EVENT_SOURCE_STATUS_PATH = SchemaPath.create(true, QName.create(EventSourceStatusNotification.QNAME, "event-source-status"));
- private static final NodeIdentifier EVENT_SOURCE_STATUS_ARG = new NodeIdentifier(EventSourceStatusNotification.QNAME);
+ public static final SchemaPath EVENT_SOURCE_STATUS_PATH = SchemaPath
+ .create(true, QName.create(EventSourceStatusNotification.QNAME, "event-source-status"));
+ private static final NodeIdentifier EVENT_SOURCE_STATUS_ARG = new NodeIdentifier(
+ EventSourceStatusNotification.QNAME);
private static final String XMLNS_ATTRIBUTE_KEY = "xmlns";
private static final String XMLNS_URI = "http://www.w3.org/2000/xmlns/";
private ConcurrentHashMap<SchemaPath, ArrayList<TopicId>> notificationTopicMap = new ConcurrentHashMap<>();
public ConnectionNotificationTopicRegistration(String SourceName, DOMNotificationListener domNotificationListener) {
- super(NotificationSourceType.ConnectionStatusChange, SourceName, EVENT_SOURCE_STATUS_PATH.getLastComponent().getNamespace().toString());
+ super(NotificationSourceType.ConnectionStatusChange, SourceName,
+ EVENT_SOURCE_STATUS_PATH.getLastComponent().getNamespace().toString());
this.domNotificationListener = Preconditions.checkNotNull(domNotificationListener);
LOG.info("Connection notification source has been initialized.");
setActive(true);
setReplaySupported(false);
}
- @Override
- public void close() throws Exception {
- if(isActive()){
+ @Override public void close() throws Exception {
+ if (isActive()) {
LOG.debug("Connection notification - publish Deactive");
publishNotification(EventSourceStatus.Deactive);
notificationTopicMap.clear();
}
}
- @Override
- void activateNotificationSource() {
+ @Override void activateNotificationSource() {
LOG.debug("Connection notification - publish Active");
publishNotification(EventSourceStatus.Active);
}
- @Override
- void deActivateNotificationSource() {
+ @Override void deActivateNotificationSource() {
LOG.debug("Connection notification - publish Inactive");
publishNotification(EventSourceStatus.Inactive);
}
- @Override
- void reActivateNotificationSource() {
+ @Override void reActivateNotificationSource() {
LOG.debug("Connection notification - reactivate - publish active");
publishNotification(EventSourceStatus.Active);
}
- @Override
- boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId) {
- if(checkNotificationPath(notificationPath) == false){
+ @Override boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId) {
+ if (checkNotificationPath(notificationPath) == false) {
LOG.debug("Bad SchemaPath for notification try to register");
return false;
}
ArrayList<TopicId> topicIds = getNotificationTopicIds(notificationPath);
- if(topicIds == null){
+ if (topicIds == null) {
topicIds = new ArrayList<>();
topicIds.add(topicId);
} else {
- if(topicIds.contains(topicId) == false){
+ if (topicIds.contains(topicId) == false) {
topicIds.add(topicId);
}
}
return true;
}
- @Override
- ArrayList<TopicId> getNotificationTopicIds(SchemaPath notificationPath) {
+ @Override ArrayList<TopicId> getNotificationTopicIds(SchemaPath notificationPath) {
return notificationTopicMap.get(notificationPath);
}
- @Override
- synchronized void unRegisterNotificationTopic(TopicId topicId) {
+ @Override synchronized void unRegisterNotificationTopic(TopicId topicId) {
List<SchemaPath> notificationPathToRemove = new ArrayList<>();
- for(SchemaPath notifKey : notificationTopicMap.keySet()){
+ for (SchemaPath notifKey : notificationTopicMap.keySet()) {
ArrayList<TopicId> topicList = notificationTopicMap.get(notifKey);
- if(topicList != null){
+ if (topicList != null) {
topicList.remove(topicId);
- if(topicList.isEmpty()){
+ if (topicList.isEmpty()) {
notificationPathToRemove.add(notifKey);
}
}
}
- for(SchemaPath notifKey : notificationPathToRemove){
+ for (SchemaPath notifKey : notificationPathToRemove) {
notificationTopicMap.remove(notifKey);
}
}
- private void publishNotification(EventSourceStatus eventSourceStatus){
+ private void publishNotification(EventSourceStatus eventSourceStatus) {
final EventSourceStatusNotification notification = new EventSourceStatusNotificationBuilder()
- .setStatus(eventSourceStatus)
- .build();
+ .setStatus(eventSourceStatus).build();
domNotificationListener.onNotification(createNotification(notification));
}
- private DOMNotification createNotification(EventSourceStatusNotification notification){
- final ContainerNode cn = Builders.containerBuilder()
- .withNodeIdentifier(EVENT_SOURCE_STATUS_ARG)
- .withChild(encapsulate(notification))
- .build();
+ private DOMNotification createNotification(EventSourceStatusNotification notification) {
+ final ContainerNode cn = Builders.containerBuilder().withNodeIdentifier(EVENT_SOURCE_STATUS_ARG)
+ .withChild(encapsulate(notification)).build();
DOMNotification dn = new DOMNotification() {
- @Override
- public SchemaPath getType() {
+ @Override public SchemaPath getType() {
return EVENT_SOURCE_STATUS_PATH;
}
- @Override
- public ContainerNode getBody() {
+ @Override public ContainerNode getBody() {
return cn;
}
};
return dn;
}
- private AnyXmlNode encapsulate(EventSourceStatusNotification notification){
+ private AnyXmlNode encapsulate(EventSourceStatusNotification notification) {
DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder docBuilder;
Document doc = docBuilder.newDocument();
final Optional<String> namespace = Optional.of(EVENT_SOURCE_STATUS_ARG.getNodeType().getNamespace().toString());
- final Element rootElement = createElement(doc , "EventSourceStatusNotification", namespace);
+ final Element rootElement = createElement(doc, "EventSourceStatusNotification", namespace);
final Element sourceElement = doc.createElement("status");
sourceElement.appendChild(doc.createTextNode(notification.getStatus().name()));
rootElement.appendChild(sourceElement);
-
return Builders.anyXmlBuilder().withNodeIdentifier(EVENT_SOURCE_STATUS_ARG)
- .withValue(new DOMSource(rootElement))
- .build();
+ .withValue(new DOMSource(rootElement)).build();
}
// Helper to create root XML element with correct namespace and attribute
private Element createElement(final Document document, final String qName, final Optional<String> namespaceURI) {
- if(namespaceURI.isPresent()) {
+ if (namespaceURI.isPresent()) {
final Element element = document.createElementNS(namespaceURI.get(), qName);
String name = XMLNS_ATTRIBUTE_KEY;
- if(element.getPrefix() != null) {
+ if (element.getPrefix() != null) {
name += ":" + element.getPrefix();
}
element.setAttributeNS(XMLNS_URI, name, namespaceURI.get());
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.messagebus.eventsources.netconf;
+
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.CheckedFuture;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.regex.Pattern;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.transform.dom.DOMResult;
+import javax.xml.transform.dom.DOMSource;
+import org.opendaylight.controller.config.util.xml.XmlUtil;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.MountPoint;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMEvent;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
+import org.opendaylight.controller.messagebus.app.util.TopicDOMNotification;
+import org.opendaylight.controller.messagebus.app.util.Util;
+import org.opendaylight.controller.messagebus.spi.EventSource;
+import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicNotification;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicStatus;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.AnyXmlNode;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+public class NetconfEventSource implements EventSource, DOMNotificationListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSource.class);
+
+ private static final NodeIdentifier TOPIC_NOTIFICATION_ARG = new NodeIdentifier(TopicNotification.QNAME);
+ private static final NodeIdentifier EVENT_SOURCE_ARG = new NodeIdentifier(
+ QName.create(TopicNotification.QNAME, "node-id"));
+ private static final NodeIdentifier TOPIC_ID_ARG = new NodeIdentifier(
+ QName.create(TopicNotification.QNAME, "topic-id"));
+ private static final NodeIdentifier PAYLOAD_ARG = new NodeIdentifier(
+ QName.create(TopicNotification.QNAME, "payload"));
+ private static final String ConnectionNotificationSourceName = "ConnectionNotificationSource";
+
+ private final String nodeId;
+ private final Node node;
+
+ private final DOMMountPoint netconfMount;
+ private final MountPoint mountPoint;
+ private final DOMNotificationPublishService domPublish;
+
+ private final Map<String, String> urnPrefixToStreamMap; // key = urnPrefix, value = StreamName
+ private final List<NotificationTopicRegistration> notificationTopicRegistrationList = new ArrayList<>();
+
+ public NetconfEventSource(final Node node, final Map<String, String> streamMap, final DOMMountPoint netconfMount,
+ final MountPoint mountPoint, final DOMNotificationPublishService publishService) {
+ this.netconfMount = Preconditions.checkNotNull(netconfMount);
+ this.mountPoint = Preconditions.checkNotNull(mountPoint);
+ this.node = Preconditions.checkNotNull(node);
+ this.urnPrefixToStreamMap = Preconditions.checkNotNull(streamMap);
+ this.domPublish = Preconditions.checkNotNull(publishService);
+ this.nodeId = node.getNodeId().getValue();
+ this.initializeNotificationTopicRegistrationList();
+
+ LOG.info("NetconfEventSource [{}] created.", this.nodeId);
+ }
+
+ private void initializeNotificationTopicRegistrationList() {
+ notificationTopicRegistrationList
+ .add(new ConnectionNotificationTopicRegistration(ConnectionNotificationSourceName, this));
+ Optional<Map<String, Stream>> streamMap = getAvailableStreams();
+ if (streamMap.isPresent()) {
+ LOG.debug("Stream configuration compare...");
+ for (String urnPrefix : this.urnPrefixToStreamMap.keySet()) {
+ final String streamName = this.urnPrefixToStreamMap.get(urnPrefix);
+ LOG.debug("urnPrefix: {} streamName: {}", urnPrefix, streamName);
+ if (streamMap.get().containsKey(streamName)) {
+ LOG.debug("Stream containig on device");
+ notificationTopicRegistrationList
+ .add(new StreamNotificationTopicRegistration(streamMap.get().get(streamName), urnPrefix, this));
+ }
+ }
+ }
+ }
+
+ private Optional<Map<String, Stream>> getAvailableStreams() {
+
+ Map<String, Stream> streamMap = null;
+ InstanceIdentifier<Streams> pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build();
+ Optional<DataBroker> dataBroker = this.mountPoint.getService(DataBroker.class);
+
+ if (dataBroker.isPresent()) {
+ LOG.debug("GET Available streams ...");
+ ReadOnlyTransaction tx = dataBroker.get().newReadOnlyTransaction();
+ CheckedFuture<Optional<Streams>, ReadFailedException> checkFeature = tx
+ .read(LogicalDatastoreType.OPERATIONAL, pathStream);
+
+ try {
+ Optional<Streams> streams = checkFeature.checkedGet();
+ if (streams.isPresent()) {
+ streamMap = new HashMap<>();
+ for (Stream stream : streams.get().getStream()) {
+ LOG.debug("*** find stream {}", stream.getName().getValue());
+ streamMap.put(stream.getName().getValue(), stream);
+ }
+ }
+ } catch (ReadFailedException e) {
+ LOG.warn("Can not read streams for node {}", this.nodeId);
+ }
+
+ } else {
+ LOG.warn("No databroker on node {}", this.nodeId);
+ }
+
+ return Optional.fromNullable(streamMap);
+ }
+
+ @Override public Future<RpcResult<JoinTopicOutput>> joinTopic(final JoinTopicInput input) {
+ LOG.debug("Join topic {} on {}", input.getTopicId().getValue(), this.nodeId);
+ final NotificationPattern notificationPattern = input.getNotificationPattern();
+ final List<SchemaPath> matchingNotifications = getMatchingNotifications(notificationPattern);
+ return registerTopic(input.getTopicId(), matchingNotifications);
+
+ }
+
+ @Override public Future<RpcResult<Void>> disJoinTopic(DisJoinTopicInput input) {
+ for (NotificationTopicRegistration reg : notificationTopicRegistrationList) {
+ reg.unRegisterNotificationTopic(input.getTopicId());
+ }
+ return Util.resultRpcSuccessFor((Void) null);
+ }
+
+ private synchronized Future<RpcResult<JoinTopicOutput>> registerTopic(final TopicId topicId,
+ final List<SchemaPath> notificationsToSubscribe) {
+ LOG.debug("Join topic {} - register", topicId);
+ JoinTopicStatus joinTopicStatus = JoinTopicStatus.Down;
+ if (notificationsToSubscribe != null && notificationsToSubscribe.isEmpty() == false) {
+ LOG.debug("Notifications to subscribe has found - count {}", notificationsToSubscribe.size());
+ final Optional<DOMNotificationService> notifyService = getDOMMountPoint()
+ .getService(DOMNotificationService.class);
+ if (notifyService.isPresent()) {
+ int registeredNotificationCount = 0;
+ for (SchemaPath schemaNotification : notificationsToSubscribe) {
+ for (NotificationTopicRegistration reg : notificationTopicRegistrationList) {
+ LOG.debug("Try notification registratio {} on SchemaPathNotification {}", reg.getSourceName(),
+ schemaNotification.getLastComponent().getLocalName());
+ if (reg.checkNotificationPath(schemaNotification)) {
+ LOG.info("Source of notification {} is activating, TopicId {}", reg.getSourceName(),
+ topicId.getValue());
+ boolean regSuccess = reg.registerNotificationTopic(schemaNotification, topicId);
+ if (regSuccess) {
+ registeredNotificationCount = registeredNotificationCount + 1;
+ }
+ }
+ }
+ }
+ if (registeredNotificationCount > 0) {
+ joinTopicStatus = JoinTopicStatus.Up;
+ }
+ } else {
+ LOG.warn("NO DOMNotification service on node {}", this.nodeId);
+ }
+ } else {
+ LOG.debug("Notifications to subscribe has NOT found");
+ }
+
+ final JoinTopicOutput output = new JoinTopicOutputBuilder().setStatus(joinTopicStatus).build();
+ return immediateFuture(RpcResultBuilder.success(output).build());
+
+ }
+
+ public void reActivateStreams() {
+ for (NotificationTopicRegistration reg : notificationTopicRegistrationList) {
+ LOG.info("Source of notification {} is reactivating on node {}", reg.getSourceName(), this.nodeId);
+ reg.reActivateNotificationSource();
+ }
+ }
+
+ public void deActivateStreams() {
+ for (NotificationTopicRegistration reg : notificationTopicRegistrationList) {
+ LOG.info("Source of notification {} is deactivating on node {}", reg.getSourceName(), this.nodeId);
+ reg.deActivateNotificationSource();
+ }
+ }
+
+ @Override public void onNotification(final DOMNotification notification) {
+ SchemaPath notificationPath = notification.getType();
+ Date notificationEventTime = null;
+ if (notification instanceof DOMEvent) {
+ notificationEventTime = ((DOMEvent) notification).getEventTime();
+ }
+ for (NotificationTopicRegistration notifReg : notificationTopicRegistrationList) {
+ ArrayList<TopicId> topicIdsForNotification = notifReg.getNotificationTopicIds(notificationPath);
+ if (topicIdsForNotification != null && topicIdsForNotification.isEmpty() == false) {
+
+ if (notifReg instanceof StreamNotificationTopicRegistration) {
+ StreamNotificationTopicRegistration streamReg = (StreamNotificationTopicRegistration) notifReg;
+ streamReg.setLastEventTime(notificationEventTime);
+ }
+
+ for (TopicId topicId : topicIdsForNotification) {
+ publishNotification(notification, topicId);
+ LOG.debug("Notification {} has been published for TopicId {}", notification.getType(),
+ topicId.getValue());
+ }
+
+ }
+ }
+ }
+
+ private void publishNotification(final DOMNotification notification, TopicId topicId) {
+ final ContainerNode topicNotification = Builders.containerBuilder().withNodeIdentifier(TOPIC_NOTIFICATION_ARG)
+ .withChild(ImmutableNodes.leafNode(TOPIC_ID_ARG, topicId))
+ .withChild(ImmutableNodes.leafNode(EVENT_SOURCE_ARG, this.nodeId)).withChild(encapsulate(notification))
+ .build();
+ try {
+ domPublish.putNotification(new TopicDOMNotification(topicNotification));
+ } catch (final InterruptedException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ private AnyXmlNode encapsulate(final DOMNotification body) {
+ // FIXME: Introduce something like AnyXmlWithNormalizedNodeData in Yangtools
+ final Document doc = XmlUtil.newDocument();
+ final Optional<String> namespace = Optional.of(PAYLOAD_ARG.getNodeType().getNamespace().toString());
+ final Element element = XmlUtil.createElement(doc, "payload", namespace);
+
+ final DOMResult result = new DOMResult(element);
+
+ final SchemaContext context = getDOMMountPoint().getSchemaContext();
+ final SchemaPath schemaPath = body.getType();
+ try {
+ NetconfMessageTransformUtil.writeNormalizedNode(body.getBody(), result, schemaPath, context);
+ return Builders.anyXmlBuilder().withNodeIdentifier(PAYLOAD_ARG).withValue(new DOMSource(element)).build();
+ } catch (IOException | XMLStreamException e) {
+ LOG.error("Unable to encapsulate notification.", e);
+ throw Throwables.propagate(e);
+ }
+ }
+
+ private List<SchemaPath> getMatchingNotifications(NotificationPattern notificationPattern) {
+ // FIXME: default language should already be regex
+ final String regex = Util.wildcardToRegex(notificationPattern.getValue());
+
+ final Pattern pattern = Pattern.compile(regex);
+ List<SchemaPath> availableNotifications = getAvailableNotifications();
+ if (availableNotifications == null || availableNotifications.isEmpty()) {
+ return null;
+ }
+ return Util.expandQname(availableNotifications, pattern);
+ }
+
+ @Override public void close() throws Exception {
+ for (NotificationTopicRegistration streamReg : notificationTopicRegistrationList) {
+ streamReg.close();
+ }
+ }
+
+ @Override public NodeKey getSourceNodeKey() {
+ return getNode().getKey();
+ }
+
+ @Override public List<SchemaPath> getAvailableNotifications() {
+
+ final List<SchemaPath> availNotifList = new ArrayList<>();
+ // add Event Source Connection status notification
+ availNotifList.add(ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH);
+
+ // FIXME: use SchemaContextListener to get changes asynchronously
+ final Set<NotificationDefinition> availableNotifications = getDOMMountPoint().getSchemaContext()
+ .getNotifications();
+ // add all known notifications from netconf device
+ for (final NotificationDefinition nd : availableNotifications) {
+ availNotifList.add(nd.getPath());
+ }
+ return availNotifList;
+ }
+
+ public Node getNode() {
+ return node;
+ }
+
+ DOMMountPoint getDOMMountPoint() {
+ return netconfMount;
+ }
+
+ MountPoint getMountPoint() {
+ return mountPoint;
+ }
+
+ NetconfNode getNetconfNode() {
+ return node.getAugmentation(NetconfNode.class);
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.messagebus.eventsources.netconf;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.opendaylight.controller.config.yang.messagebus.netconf.NamespaceToStream;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.MountPointService;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class NetconfEventSourceManager implements DataChangeListener, AutoCloseable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSourceManager.class);
+ private static final TopologyKey NETCONF_TOPOLOGY_KEY = new TopologyKey(
+ new TopologyId(TopologyNetconf.QNAME.getLocalName()));
+ private static final InstanceIdentifier<Node> NETCONF_DEVICE_PATH = InstanceIdentifier.create(NetworkTopology.class)
+ .child(Topology.class, NETCONF_TOPOLOGY_KEY).child(Node.class);
+
+ private final Map<String, String> streamMap;
+ private final ConcurrentHashMap<InstanceIdentifier<?>, NetconfEventSourceRegistration> registrationMap = new ConcurrentHashMap<>();
+ private final DOMNotificationPublishService publishService;
+ private final DOMMountPointService domMounts;
+ private final MountPointService mountPointService;
+ private ListenerRegistration<DataChangeListener> listenerRegistration;
+ private final EventSourceRegistry eventSourceRegistry;
+
+ public static NetconfEventSourceManager create(final DataBroker dataBroker,
+ final DOMNotificationPublishService domPublish, final DOMMountPointService domMount,
+ final MountPointService bindingMount, final EventSourceRegistry eventSourceRegistry,
+ final List<NamespaceToStream> namespaceMapping) {
+
+ final NetconfEventSourceManager eventSourceManager = new NetconfEventSourceManager(domPublish, domMount,
+ bindingMount, eventSourceRegistry, namespaceMapping);
+
+ eventSourceManager.initialize(dataBroker);
+
+ return eventSourceManager;
+
+ }
+
+ private NetconfEventSourceManager(final DOMNotificationPublishService domPublish,
+ final DOMMountPointService domMount, final MountPointService bindingMount,
+ final EventSourceRegistry eventSourceRegistry, final List<NamespaceToStream> namespaceMapping) {
+
+ Preconditions.checkNotNull(domPublish);
+ Preconditions.checkNotNull(domMount);
+ Preconditions.checkNotNull(bindingMount);
+ Preconditions.checkNotNull(eventSourceRegistry);
+ Preconditions.checkNotNull(namespaceMapping);
+ this.streamMap = namespaceToStreamMapping(namespaceMapping);
+ this.domMounts = domMount;
+ this.mountPointService = bindingMount;
+ this.publishService = domPublish;
+ this.eventSourceRegistry = eventSourceRegistry;
+ }
+
+ private void initialize(final DataBroker dataBroker) {
+ Preconditions.checkNotNull(dataBroker);
+ listenerRegistration = dataBroker
+ .registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, NETCONF_DEVICE_PATH, this,
+ DataChangeScope.SUBTREE);
+ LOG.info("NetconfEventSourceManager initialized.");
+ }
+
+ private Map<String, String> namespaceToStreamMapping(final List<NamespaceToStream> namespaceMapping) {
+ final Map<String, String> streamMap = new HashMap<>(namespaceMapping.size());
+
+ for (final NamespaceToStream nToS : namespaceMapping) {
+ streamMap.put(nToS.getUrnPrefix(), nToS.getStreamName());
+ }
+
+ return streamMap;
+ }
+
+ @Override public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> event) {
+
+ LOG.debug("[DataChangeEvent<InstanceIdentifier<?>, DataObject>: {}]", event);
+ for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getCreatedData().entrySet()) {
+ if (changeEntry.getValue() instanceof Node) {
+ nodeCreated(changeEntry.getKey(), (Node) changeEntry.getValue());
+ }
+ }
+
+ for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getUpdatedData().entrySet()) {
+ if (changeEntry.getValue() instanceof Node) {
+ nodeUpdated(changeEntry.getKey(), (Node) changeEntry.getValue());
+ }
+ }
+
+ for (InstanceIdentifier<?> removePath : event.getRemovedPaths()) {
+ DataObject removeObject = event.getOriginalData().get(removePath);
+ if (removeObject instanceof Node) {
+ nodeRemoved(removePath);
+ }
+ }
+
+ }
+
+ private void nodeCreated(final InstanceIdentifier<?> key, final Node node) {
+ Preconditions.checkNotNull(key);
+ if (validateNode(node) == false) {
+ LOG.warn("NodeCreated event : Node [{}] is null or not valid.", key.toString());
+ return;
+ }
+ LOG.info("Netconf event source [{}] is creating...", key.toString());
+ NetconfEventSourceRegistration nesr = NetconfEventSourceRegistration.create(key, node, this);
+ if (nesr != null) {
+ NetconfEventSourceRegistration nesrOld = registrationMap.put(key, nesr);
+ if (nesrOld != null) {
+ nesrOld.close();
+ }
+ }
+ }
+
+ private void nodeUpdated(final InstanceIdentifier<?> key, final Node node) {
+ Preconditions.checkNotNull(key);
+ if (validateNode(node) == false) {
+ LOG.warn("NodeUpdated event : Node [{}] is null or not valid.", key.toString());
+ return;
+ }
+
+ LOG.info("Netconf event source [{}] is updating...", key.toString());
+ NetconfEventSourceRegistration nesr = registrationMap.get(key);
+ if (nesr != null) {
+ nesr.updateStatus();
+ } else {
+ nodeCreated(key, node);
+ }
+ }
+
+ private void nodeRemoved(final InstanceIdentifier<?> key) {
+ Preconditions.checkNotNull(key);
+ LOG.info("Netconf event source [{}] is removing...", key.toString());
+ NetconfEventSourceRegistration nesr = registrationMap.remove(key);
+ if (nesr != null) {
+ nesr.close();
+ }
+ }
+
+ private boolean validateNode(final Node node) {
+ if (node == null) {
+ return false;
+ }
+ return isNetconfNode(node);
+ }
+
+ Map<String, String> getStreamMap() {
+ return streamMap;
+ }
+
+ DOMNotificationPublishService getPublishService() {
+ return publishService;
+ }
+
+ DOMMountPointService getDomMounts() {
+ return domMounts;
+ }
+
+ EventSourceRegistry getEventSourceRegistry() {
+ return eventSourceRegistry;
+ }
+
+ MountPointService getMountPointService() {
+ return mountPointService;
+ }
+
+ private boolean isNetconfNode(final Node node) {
+ return node.getAugmentation(NetconfNode.class) != null;
+ }
+
+ @Override public void close() {
+ listenerRegistration.close();
+ for (final NetconfEventSourceRegistration reg : registrationMap.values()) {
+ reg.close();
+ }
+ registrationMap.clear();
+ }
+
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.eventsources.netconf;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import java.util.List;
+import org.opendaylight.controller.md.sal.binding.api.MountPoint;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper class to keep connection status of netconf node and event source registration object
+ */
+public class NetconfEventSourceRegistration implements AutoCloseable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSourceRegistration.class);
+ private static final YangInstanceIdentifier NETCONF_DEVICE_DOM_PATH = YangInstanceIdentifier.builder()
+ .node(NetworkTopology.QNAME).node(Topology.QNAME)
+ .nodeWithKey(Topology.QNAME, QName.create(Topology.QNAME, "topology-id"), TopologyNetconf.QNAME.getLocalName())
+ .node(Node.QNAME).build();
+ private static final QName NODE_ID_QNAME = QName.create(Node.QNAME, "node-id");
+ private static final String NotificationCapabilityPrefix = "(urn:ietf:params:xml:ns:netconf:notification";
+
+ private final Node node;
+ private final InstanceIdentifier<?> instanceIdent;
+ private final NetconfEventSourceManager netconfEventSourceManager;
+ private ConnectionStatus currentNetconfConnStatus;
+ private EventSourceRegistration<NetconfEventSource> eventSourceRegistration;
+
+ public static NetconfEventSourceRegistration create(final InstanceIdentifier<?> instanceIdent, final Node node,
+ final NetconfEventSourceManager netconfEventSourceManager) {
+ Preconditions.checkNotNull(instanceIdent);
+ Preconditions.checkNotNull(node);
+ Preconditions.checkNotNull(netconfEventSourceManager);
+ if (isEventSource(node) == false) {
+ return null;
+ }
+ NetconfEventSourceRegistration nesr = new NetconfEventSourceRegistration(instanceIdent, node,
+ netconfEventSourceManager);
+ nesr.updateStatus();
+ LOG.debug("NetconfEventSourceRegistration for node {} has been initialized...", node.getNodeId().getValue());
+ return nesr;
+ }
+
+ private static boolean isEventSource(final Node node) {
+ final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
+ if (netconfNode == null) {
+ return false;
+ }
+ if (netconfNode.getAvailableCapabilities() == null) {
+ return false;
+ }
+ final List<String> capabilities = netconfNode.getAvailableCapabilities().getAvailableCapability();
+ if (capabilities == null || capabilities.isEmpty()) {
+ return false;
+ }
+ for (final String capability : netconfNode.getAvailableCapabilities().getAvailableCapability()) {
+ if (capability.startsWith(NotificationCapabilityPrefix)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private NetconfEventSourceRegistration(final InstanceIdentifier<?> instanceIdent, final Node node,
+ final NetconfEventSourceManager netconfEventSourceManager) {
+ this.instanceIdent = instanceIdent;
+ this.node = node;
+ this.netconfEventSourceManager = netconfEventSourceManager;
+ this.eventSourceRegistration = null;
+ }
+
+ public Node getNode() {
+ return node;
+ }
+
+ Optional<EventSourceRegistration<NetconfEventSource>> getEventSourceRegistration() {
+ return Optional.fromNullable(eventSourceRegistration);
+ }
+
+ NetconfNode getNetconfNode() {
+ return node.getAugmentation(NetconfNode.class);
+ }
+
+ void updateStatus() {
+ ConnectionStatus netconfConnStatus = getNetconfNode().getConnectionStatus();
+ LOG.info("Change status on node {}, new status is {}", this.node.getNodeId().getValue(), netconfConnStatus);
+ if (netconfConnStatus.equals(currentNetconfConnStatus)) {
+ return;
+ }
+ changeStatus(netconfConnStatus);
+ }
+
+ private boolean checkConnectionStatusType(ConnectionStatus status) {
+ if (status == ConnectionStatus.Connected || status == ConnectionStatus.Connecting
+ || status == ConnectionStatus.UnableToConnect) {
+ return true;
+ }
+ return false;
+ }
+
+ private void changeStatus(ConnectionStatus newStatus) {
+ Preconditions.checkNotNull(newStatus);
+ if (checkConnectionStatusType(newStatus) == false) {
+ throw new IllegalStateException("Unknown new Netconf Connection Status");
+ }
+ if (this.currentNetconfConnStatus == null) {
+ if (newStatus == ConnectionStatus.Connected) {
+ registrationEventSource();
+ }
+ } else if (this.currentNetconfConnStatus == ConnectionStatus.Connecting) {
+ if (newStatus == ConnectionStatus.Connected) {
+ if (this.eventSourceRegistration == null) {
+ registrationEventSource();
+ } else {
+ // reactivate stream on registered event source (invoke publish notification about connection)
+ this.eventSourceRegistration.getInstance().reActivateStreams();
+ }
+ }
+ } else if (this.currentNetconfConnStatus == ConnectionStatus.Connected) {
+
+ if (newStatus == ConnectionStatus.Connecting || newStatus == ConnectionStatus.UnableToConnect) {
+ // deactivate streams on registered event source (invoke publish notification about connection)
+ this.eventSourceRegistration.getInstance().deActivateStreams();
+ }
+ } else if (this.currentNetconfConnStatus == ConnectionStatus.UnableToConnect) {
+ if (newStatus == ConnectionStatus.Connected) {
+ if (this.eventSourceRegistration == null) {
+ registrationEventSource();
+ } else {
+ // reactivate stream on registered event source (invoke publish notification about connection)
+ this.eventSourceRegistration.getInstance().reActivateStreams();
+ }
+ }
+ } else {
+ throw new IllegalStateException("Unknown current Netconf Connection Status");
+ }
+ this.currentNetconfConnStatus = newStatus;
+ }
+
+ private void registrationEventSource() {
+ final Optional<MountPoint> mountPoint = netconfEventSourceManager.getMountPointService()
+ .getMountPoint(instanceIdent);
+ final Optional<DOMMountPoint> domMountPoint = netconfEventSourceManager.getDomMounts()
+ .getMountPoint(domMountPath(node.getNodeId()));
+ EventSourceRegistration<NetconfEventSource> registration = null;
+ if (domMountPoint.isPresent() && mountPoint.isPresent()) {
+ final NetconfEventSource netconfEventSource = new NetconfEventSource(node,
+ netconfEventSourceManager.getStreamMap(), domMountPoint.get(), mountPoint.get(),
+ netconfEventSourceManager.getPublishService());
+ registration = netconfEventSourceManager.getEventSourceRegistry().registerEventSource(netconfEventSource);
+ LOG.info("Event source {} has been registered", node.getNodeId().getValue());
+ }
+ this.eventSourceRegistration = registration;
+ }
+
+ private YangInstanceIdentifier domMountPath(final NodeId nodeId) {
+ return YangInstanceIdentifier.builder(NETCONF_DEVICE_DOM_PATH)
+ .nodeWithKey(Node.QNAME, NODE_ID_QNAME, nodeId.getValue()).build();
+ }
+
+ private void closeEventSourceRegistration() {
+ if (getEventSourceRegistration().isPresent()) {
+ getEventSourceRegistration().get().close();
+ }
+ }
+
+ @Override public void close() {
+ closeEventSourceRegistration();
+ }
+
+}
package org.opendaylight.controller.messagebus.eventsources.netconf;
import java.util.ArrayList;
-
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public abstract class NotificationTopicRegistration implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(NotificationTopicRegistration.class);
- public enum NotificationSourceType{
+ public enum NotificationSourceType {
NetconfDeviceStream,
ConnectionStatusChange;
}
private final String notificationUrnPrefix;
private boolean replaySupported;
- protected NotificationTopicRegistration(NotificationSourceType notificationSourceType, String sourceName, String notificationUrnPrefix) {
+ protected NotificationTopicRegistration(NotificationSourceType notificationSourceType, String sourceName,
+ String notificationUrnPrefix) {
this.notificationSourceType = notificationSourceType;
this.sourceName = sourceName;
this.notificationUrnPrefix = notificationUrnPrefix;
return notificationUrnPrefix;
}
- public boolean checkNotificationPath(SchemaPath notificationPath){
- if(notificationPath == null){
+ public boolean checkNotificationPath(SchemaPath notificationPath) {
+ if (notificationPath == null) {
return false;
}
String nameSpace = notificationPath.getLastComponent().toString();
- LOG.debug("CheckNotification - name space {} - NotificationUrnPrefix {}", nameSpace, getNotificationUrnPrefix());
+ LOG.debug("CheckNotification - name space {} - NotificationUrnPrefix {}", nameSpace,
+ getNotificationUrnPrefix());
return nameSpace.startsWith(getNotificationUrnPrefix());
}
+
abstract void activateNotificationSource();
abstract void deActivateNotificationSource();
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.eventsources.netconf;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeAttrBuilder;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StreamNotificationTopicRegistration extends NotificationTopicRegistration {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StreamNotificationTopicRegistration.class);
+ private static final NodeIdentifier STREAM_QNAME = new NodeIdentifier(
+ QName.create(CreateSubscriptionInput.QNAME, "stream"));
+ private static final SchemaPath CREATE_SUBSCRIPTION = SchemaPath
+ .create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription"));
+ private static final NodeIdentifier START_TIME_SUBSCRIPTION = new NodeIdentifier(
+ QName.create(CreateSubscriptionInput.QNAME, "startTime"));
+
+ final private DOMMountPoint domMountPoint;
+ final private String nodeId;
+ final private NetconfEventSource netconfEventSource;
+ final private Stream stream;
+ private Date lastEventTime;
+
+ private ConcurrentHashMap<SchemaPath, ListenerRegistration<NetconfEventSource>> notificationRegistrationMap = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<SchemaPath, ArrayList<TopicId>> notificationTopicMap = new ConcurrentHashMap<>();
+
+ public StreamNotificationTopicRegistration(final Stream stream, final String notificationPrefix,
+ NetconfEventSource netconfEventSource) {
+ super(NotificationSourceType.NetconfDeviceStream, stream.getName().getValue(), notificationPrefix);
+ this.domMountPoint = netconfEventSource.getDOMMountPoint();
+ this.nodeId = netconfEventSource.getNode().getNodeId().getValue().toString();
+ this.netconfEventSource = netconfEventSource;
+ this.stream = stream;
+ this.lastEventTime = null;
+ setReplaySupported(this.stream.isReplaySupport());
+ setActive(false);
+ LOG.info("StreamNotificationTopicRegistration initialized for {}", getStreamName());
+ }
+
+ void activateNotificationSource() {
+ if (isActive() == false) {
+ LOG.info("Stream {} is not active on node {}. Will subscribe.", this.getStreamName(), this.nodeId);
+ final ContainerNode input = Builders.containerBuilder()
+ .withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME))
+ .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName())).build();
+ CheckedFuture<DOMRpcResult, DOMRpcException> csFuture = domMountPoint.getService(DOMRpcService.class).get()
+ .invokeRpc(CREATE_SUBSCRIPTION, input);
+ try {
+ csFuture.checkedGet();
+ setActive(true);
+ } catch (DOMRpcException e) {
+ LOG.warn("Can not subscribe stream {} on node {}", this.getSourceName(), this.nodeId);
+ setActive(false);
+ return;
+ }
+ } else {
+ LOG.info("Stream {} is now active on node {}", this.getStreamName(), this.nodeId);
+ }
+ }
+
+ void reActivateNotificationSource() {
+ if (isActive()) {
+ LOG.info("Stream {} is reactivating on node {}.", this.getStreamName(), this.nodeId);
+ DataContainerNodeAttrBuilder<NodeIdentifier, ContainerNode> inputBuilder = Builders.containerBuilder()
+ .withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME))
+ .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName()));
+ if (isReplaySupported() && this.getLastEventTime() != null) {
+ inputBuilder.withChild(ImmutableNodes.leafNode(START_TIME_SUBSCRIPTION, this.getLastEventTime()));
+ }
+ final ContainerNode input = inputBuilder.build();
+ CheckedFuture<DOMRpcResult, DOMRpcException> csFuture = domMountPoint.getService(DOMRpcService.class).get()
+ .invokeRpc(CREATE_SUBSCRIPTION, input);
+ try {
+ csFuture.checkedGet();
+ setActive(true);
+ } catch (DOMRpcException e) {
+ LOG.warn("Can not resubscribe stream {} on node {}", this.getSourceName(), this.nodeId);
+ setActive(false);
+ return;
+ }
+ }
+ }
+
+ @Override void deActivateNotificationSource() {
+ // no operations need
+ }
+
+ private void closeStream() {
+ if (isActive()) {
+ for (ListenerRegistration<NetconfEventSource> reg : notificationRegistrationMap.values()) {
+ reg.close();
+ }
+ notificationRegistrationMap.clear();
+ notificationTopicMap.clear();
+ setActive(false);
+ }
+ }
+
+ private String getStreamName() {
+ return getSourceName();
+ }
+
+ @Override ArrayList<TopicId> getNotificationTopicIds(SchemaPath notificationPath) {
+ return notificationTopicMap.get(notificationPath);
+ }
+
+ @Override boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId) {
+
+ if (checkNotificationPath(notificationPath) == false) {
+ LOG.debug("Bad SchemaPath for notification try to register");
+ return false;
+ }
+
+ final Optional<DOMNotificationService> notifyService = domMountPoint.getService(DOMNotificationService.class);
+ if (notifyService.isPresent() == false) {
+ LOG.debug("DOMNotificationService is not present");
+ return false;
+ }
+
+ activateNotificationSource();
+ if (isActive() == false) {
+ LOG.warn("Stream {} is not active, listener for notification {} is not registered.", getStreamName(),
+ notificationPath.toString());
+ return false;
+ }
+
+ ListenerRegistration<NetconfEventSource> registration = notifyService.get()
+ .registerNotificationListener(this.netconfEventSource, notificationPath);
+ notificationRegistrationMap.put(notificationPath, registration);
+ ArrayList<TopicId> topicIds = getNotificationTopicIds(notificationPath);
+ if (topicIds == null) {
+ topicIds = new ArrayList<>();
+ topicIds.add(topicId);
+ } else {
+ if (topicIds.contains(topicId) == false) {
+ topicIds.add(topicId);
+ }
+ }
+
+ notificationTopicMap.put(notificationPath, topicIds);
+ return true;
+ }
+
+ @Override synchronized void unRegisterNotificationTopic(TopicId topicId) {
+ List<SchemaPath> notificationPathToRemove = new ArrayList<>();
+ for (SchemaPath notifKey : notificationTopicMap.keySet()) {
+ ArrayList<TopicId> topicList = notificationTopicMap.get(notifKey);
+ if (topicList != null) {
+ topicList.remove(topicId);
+ if (topicList.isEmpty()) {
+ notificationPathToRemove.add(notifKey);
+ }
+ }
+ }
+ for (SchemaPath notifKey : notificationPathToRemove) {
+ notificationTopicMap.remove(notifKey);
+ ListenerRegistration<NetconfEventSource> reg = notificationRegistrationMap.remove(notifKey);
+ if (reg != null) {
+ reg.close();
+ }
+ }
+ }
+
+ Optional<Date> getLastEventTime() {
+ return Optional.fromNullable(lastEventTime);
+ }
+
+ void setLastEventTime(Date lastEventTime) {
+ this.lastEventTime = lastEventTime;
+ }
+
+ @Override public void close() throws Exception {
+ closeStream();
+ }
+
+}
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<snapshot>
+ <configuration>
+ <data xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
+ <modules xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
+ <module>
+ <name>messagebus-netconf</name>
+ <type xmlns:binding-impl="urn:opendaylight:params:xml:ns:yang:controller:messagebus:netconf">binding-impl:messagebus-netconf</type>
+ <dom-broker xmlns="urn:opendaylight:params:xml:ns:yang:controller:messagebus:netconf">
+ <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:dom-broker-osgi-registry</type>
+ <name>dom-broker</name>
+ </dom-broker>
+ <binding-broker xmlns="urn:opendaylight:params:xml:ns:yang:controller:messagebus:netconf">
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">prefix:binding-broker-osgi-registry</type>
+ <name>binding-osgi-broker</name>
+ </binding-broker>
+ <event-source-registry xmlns="urn:opendaylight:params:xml:ns:yang:controller:messagebus:netconf">
+ <type xmlns:mb-esr="urn:opendaylight:params:xml:ns:yang:controller:messagebus:spi:eventsourceregistry">mb-esr:event-source-registry</type>
+ <name>messagebus-app-impl</name>
+ </event-source-registry>
+ <namespace-to-stream xmlns="urn:opendaylight:params:xml:ns:yang:controller:messagebus:netconf">
+ <urn-prefix>urn:ietf:params:xml:ns:yang:smiv2</urn-prefix>
+ <stream-name>SNMP</stream-name>
+ </namespace-to-stream>
+ <namespace-to-stream xmlns="urn:opendaylight:params:xml:ns:yang:controller:messagebus:netconf">
+ <urn-prefix>urn:ietf:params:xml:ns:yang:ietf-syslog-notification</urn-prefix>
+ <stream-name>SYSLOG</stream-name>
+ </namespace-to-stream>
+ </module>
+ </modules>
+ </data>
+ </configuration>
+ <required-capabilities>
+ <capability>urn:opendaylight:params:xml:ns:yang:controller:messagebus:netconf?module=messagebus-netconf&revision=2015-07-28</capability>
+ <capability>urn:opendaylight:params:xml:ns:yang:controller:messagebus:spi:eventsourceregistry?module=messagebus-event-source-registry&revision=2015-04-02</capability>
+ </required-capabilities>
+</snapshot>
--- /dev/null
+module messagebus-netconf {
+ yang-version 1;
+ namespace "urn:opendaylight:params:xml:ns:yang:controller:messagebus:netconf";
+ prefix "msgb-netconf";
+
+ import config { prefix config; revision-date 2013-04-05; }
+ import opendaylight-md-sal-binding {prefix sal;}
+ import opendaylight-md-sal-dom {prefix dom;}
+ import messagebus-event-source-registry {prefix esr;}
+
+ description
+ "Message bus netconf event source";
+
+ revision "2015-07-28" {
+ description "Message bus netconf event source initial definition";
+ }
+
+ identity messagebus-netconf {
+ base config:module-type;
+ config:java-name-prefix MessageBusNetconf;
+ }
+
+ augment "/config:modules/config:module/config:configuration" {
+ case messagebus-netconf {
+ when "/config:modules/config:module/config:type = 'messagebus-netconf'";
+
+ container event-source-registry {
+ uses config:service-ref {
+ refine type {
+ mandatory true;
+ config:required-identity esr:event-source-registry;
+ }
+ }
+ }
+
+ container dom-broker {
+ uses config:service-ref {
+ refine type {
+ mandatory true;
+ config:required-identity dom:dom-broker-osgi-registry;
+ }
+ }
+ }
+
+ container binding-broker {
+ uses config:service-ref {
+ refine type {
+ mandatory true;
+ config:required-identity sal:binding-broker-osgi-registry;
+ }
+ }
+ }
+
+ list namespace-to-stream {
+ key urn-prefix;
+
+ leaf urn-prefix {
+ type string;
+ }
+
+ leaf stream-name {
+ type string;
+ }
+ }
+
+ }
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.eventsources.netconf;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.notNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.controller.config.yang.messagebus.netconf.NamespaceToStream;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.MountPoint;
+import org.opendaylight.controller.md.sal.binding.api.MountPointService;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.messagebus.spi.EventSource;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+public class NetconfEventSourceManagerTest {
+
+ NetconfEventSourceManager netconfEventSourceManager;
+ ListenerRegistration listenerRegistrationMock;
+ DOMMountPointService domMountPointServiceMock;
+ MountPointService mountPointServiceMock;
+ EventSourceRegistry eventSourceTopologyMock;
+ AsyncDataChangeEvent asyncDataChangeEventMock;
+ RpcProviderRegistry rpcProviderRegistryMock;
+ EventSourceRegistry eventSourceRegistry;
+ @BeforeClass
+ public static void initTestClass() throws IllegalAccessException, InstantiationException {
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ DataBroker dataBrokerMock = mock(DataBroker.class);
+ DOMNotificationPublishService domNotificationPublishServiceMock = mock(DOMNotificationPublishService.class);
+ domMountPointServiceMock = mock(DOMMountPointService.class);
+ mountPointServiceMock = mock(MountPointService.class);
+ eventSourceTopologyMock = mock(EventSourceRegistry.class);
+ rpcProviderRegistryMock = mock(RpcProviderRegistry.class);
+ eventSourceRegistry = mock(EventSourceRegistry.class);
+ List<NamespaceToStream> namespaceToStreamList = new ArrayList<>();
+
+ listenerRegistrationMock = mock(ListenerRegistration.class);
+ doReturn(listenerRegistrationMock).when(dataBrokerMock).registerDataChangeListener(eq(LogicalDatastoreType.OPERATIONAL), any(InstanceIdentifier.class), any(NetconfEventSourceManager.class), eq(
+ AsyncDataBroker.DataChangeScope.SUBTREE));
+
+ Optional<DOMMountPoint> optionalDomMountServiceMock = (Optional<DOMMountPoint>) mock(Optional.class);
+ doReturn(true).when(optionalDomMountServiceMock).isPresent();
+ doReturn(optionalDomMountServiceMock).when(domMountPointServiceMock).getMountPoint((YangInstanceIdentifier)notNull());
+
+ DOMMountPoint domMountPointMock = mock(DOMMountPoint.class);
+ doReturn(domMountPointMock).when(optionalDomMountServiceMock).get();
+
+
+ Optional optionalBindingMountMock = mock(Optional.class);
+ doReturn(true).when(optionalBindingMountMock).isPresent();
+
+ MountPoint mountPointMock = mock(MountPoint.class);
+ doReturn(optionalBindingMountMock).when(mountPointServiceMock).getMountPoint(any(InstanceIdentifier.class));
+ doReturn(mountPointMock).when(optionalBindingMountMock).get();
+
+ Optional optionalMpDataBroker = mock(Optional.class);
+ DataBroker mpDataBroker = mock(DataBroker.class);
+ doReturn(optionalMpDataBroker).when(mountPointMock).getService(DataBroker.class);
+ doReturn(true).when(optionalMpDataBroker).isPresent();
+ doReturn(mpDataBroker).when(optionalMpDataBroker).get();
+
+ ReadOnlyTransaction rtx = mock(ReadOnlyTransaction.class);
+ doReturn(rtx).when(mpDataBroker).newReadOnlyTransaction();
+ CheckedFuture<Optional<Streams>, ReadFailedException> checkFeature = (CheckedFuture<Optional<Streams>, ReadFailedException>)mock(CheckedFuture.class);
+ InstanceIdentifier<Streams> pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build();
+ doReturn(checkFeature).when(rtx).read(LogicalDatastoreType.OPERATIONAL, pathStream);
+ Optional<Streams> avStreams = NetconfTestUtils.getAvailableStream("stream01", true);
+ doReturn(avStreams).when(checkFeature).checkedGet();
+
+ EventSourceRegistration esrMock = mock(EventSourceRegistration.class);
+
+ netconfEventSourceManager =
+ NetconfEventSourceManager
+ .create(dataBrokerMock, domNotificationPublishServiceMock, domMountPointServiceMock,
+ mountPointServiceMock, eventSourceRegistry, namespaceToStreamList);
+ }
+
+ @Test
+ public void onDataChangedCreateEventSourceTestByCreateEntry() throws Exception {
+ onDataChangedTestHelper(true,false,true, NetconfTestUtils.notification_capability_prefix);
+ netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
+ verify(eventSourceRegistry, times(1)).registerEventSource(any(EventSource.class));
+ }
+
+ @Test
+ public void onDataChangedCreateEventSourceTestByUpdateEntry() throws Exception {
+ onDataChangedTestHelper(false,true,true, NetconfTestUtils.notification_capability_prefix);
+ netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
+ verify(eventSourceRegistry, times(1)).registerEventSource(any(EventSource.class));
+ }
+
+ @Test
+ public void onDataChangedCreateEventSourceTestNotNeconf() throws Exception {
+ onDataChangedTestHelper(false,true,false, NetconfTestUtils.notification_capability_prefix);
+ netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
+ verify(eventSourceRegistry, times(0)).registerEventSource(any(EventSource.class));
+ }
+
+ @Test
+ public void onDataChangedCreateEventSourceTestNotNotificationCapability() throws Exception {
+ onDataChangedTestHelper(true,false,true,"bad-prefix");
+ netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
+ verify(eventSourceRegistry, times(0)).registerEventSource(any(EventSource.class));
+ }
+
+ private void onDataChangedTestHelper(boolean create, boolean update, boolean isNetconf, String notificationCapabilityPrefix) throws Exception{
+ asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class);
+ Map<InstanceIdentifier, DataObject> mapCreate = new HashMap<>();
+ Map<InstanceIdentifier, DataObject> mapUpdate = new HashMap<>();
+
+ Node node01;
+ String nodeId = "Node01";
+ doReturn(mapCreate).when(asyncDataChangeEventMock).getCreatedData();
+ doReturn(mapUpdate).when(asyncDataChangeEventMock).getUpdatedData();
+
+ if(isNetconf){
+ node01 = NetconfTestUtils
+ .getNetconfNode(nodeId, "node01.test.local", ConnectionStatus.Connected, notificationCapabilityPrefix);
+
+ } else {
+ node01 = NetconfTestUtils.getNode(nodeId);
+ }
+
+ if(create){
+ mapCreate.put(NetconfTestUtils.getInstanceIdentifier(node01), node01);
+ }
+ if(update){
+ mapUpdate.put(NetconfTestUtils.getInstanceIdentifier(node01), node01);
+ }
+
+ }
+
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.eventsources.netconf;
+
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.binding.api.BindingService;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.MountPoint;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.md.sal.dom.api.DOMService;
+import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+public class NetconfEventSourceTest {
+
+ NetconfEventSource netconfEventSource;
+ DOMMountPoint domMountPointMock;
+ MountPoint mountPointMock;
+ JoinTopicInput joinTopicInputMock;
+
+ @Before
+ public void setUp() throws Exception {
+ Map<String, String> streamMap = new HashMap<>();
+ streamMap.put("uriStr1", "string2");
+ domMountPointMock = mock(DOMMountPoint.class);
+ mountPointMock = mock(MountPoint.class);
+ DOMNotificationPublishService domNotificationPublishServiceMock = mock(DOMNotificationPublishService.class);
+ RpcConsumerRegistry rpcConsumerRegistryMock = mock(RpcConsumerRegistry.class);
+ Optional<BindingService> onlyOptionalMock = (Optional<BindingService>) mock(Optional.class);
+ NotificationsService notificationsServiceMock = mock(NotificationsService.class);
+ doReturn(notificationsServiceMock).when(rpcConsumerRegistryMock).getRpcService(NotificationsService.class);
+
+ Optional<DataBroker> optionalMpDataBroker = (Optional<DataBroker>) mock(Optional.class);
+ DataBroker mpDataBroker = mock(DataBroker.class);
+ doReturn(optionalMpDataBroker).when(mountPointMock).getService(DataBroker.class);
+ doReturn(true).when(optionalMpDataBroker).isPresent();
+ doReturn(mpDataBroker).when(optionalMpDataBroker).get();
+
+ ReadOnlyTransaction rtx = mock(ReadOnlyTransaction.class);
+ doReturn(rtx).when(mpDataBroker).newReadOnlyTransaction();
+ CheckedFuture<Optional<Streams>, ReadFailedException> checkFeature = (CheckedFuture<Optional<Streams>, ReadFailedException>)mock(CheckedFuture.class);
+ InstanceIdentifier<Streams> pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build();
+ doReturn(checkFeature).when(rtx).read(LogicalDatastoreType.OPERATIONAL, pathStream);
+ Optional<Streams> avStreams = NetconfTestUtils.getAvailableStream("stream01", true);
+ doReturn(avStreams).when(checkFeature).checkedGet();
+
+ netconfEventSource = new NetconfEventSource(
+ NetconfTestUtils.getNetconfNode("NodeId1", "node.test.local", ConnectionStatus.Connected,
+ NetconfTestUtils.notification_capability_prefix),
+ streamMap,
+ domMountPointMock,
+ mountPointMock ,
+ domNotificationPublishServiceMock);
+
+ }
+
+ @Test
+ public void joinTopicTest() throws Exception{
+ joinTopicTestHelper();
+ assertNotNull("JoinTopic return value has not been created correctly.", netconfEventSource.joinTopic(joinTopicInputMock));
+ }
+
+ private void joinTopicTestHelper() throws Exception{
+ joinTopicInputMock = mock(JoinTopicInput.class);
+ TopicId topicId = new TopicId("topicID007");
+ doReturn(topicId).when(joinTopicInputMock).getTopicId();
+ NotificationPattern notificationPatternMock = mock(NotificationPattern.class);
+ doReturn(notificationPatternMock).when(joinTopicInputMock).getNotificationPattern();
+ doReturn("uriStr1").when(notificationPatternMock).getValue();
+
+ SchemaContext schemaContextMock = mock(SchemaContext.class);
+ doReturn(schemaContextMock).when(domMountPointMock).getSchemaContext();
+ Set<NotificationDefinition> notificationDefinitionSet = new HashSet<>();
+ NotificationDefinition notificationDefinitionMock = mock(NotificationDefinition.class);
+ notificationDefinitionSet.add(notificationDefinitionMock);
+
+ URI uri = new URI("uriStr1");
+ QName qName = new QName(uri, "localName1");
+ org.opendaylight.yangtools.yang.model.api.SchemaPath schemaPath = SchemaPath.create(true, qName);
+ doReturn(notificationDefinitionSet).when(schemaContextMock).getNotifications();
+ doReturn(schemaPath).when(notificationDefinitionMock).getPath();
+
+ Optional<DOMNotificationService> domNotificationServiceOptionalMock = (Optional<DOMNotificationService>) mock(Optional.class);
+ doReturn(domNotificationServiceOptionalMock).when(domMountPointMock).getService(DOMNotificationService.class);
+ doReturn(true).when(domNotificationServiceOptionalMock).isPresent();
+
+ DOMNotificationService domNotificationServiceMock = mock(DOMNotificationService.class);
+ doReturn(domNotificationServiceMock).when(domNotificationServiceOptionalMock).get();
+ ListenerRegistration<NetconfEventSource> listenerRegistrationMock = (ListenerRegistration<NetconfEventSource>)mock(ListenerRegistration.class);
+ doReturn(listenerRegistrationMock).when(domNotificationServiceMock).registerNotificationListener(any(NetconfEventSource.class), any(SchemaPath.class));
+
+ Optional<DOMService> optionalMock = (Optional<DOMService>) mock(Optional.class);
+ doReturn(optionalMock).when(domMountPointMock).getService(DOMRpcService.class);
+ doReturn(true).when(optionalMock).isPresent();
+ DOMRpcService domRpcServiceMock = mock(DOMRpcService.class);
+ doReturn(domRpcServiceMock).when(optionalMock).get();
+ CheckedFuture checkedFutureMock = mock(CheckedFuture.class);
+ doReturn(checkedFutureMock).when(domRpcServiceMock).invokeRpc(any(SchemaPath.class), any(ContainerNode.class));
+
+ }
+
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.eventsources.netconf;
+
+import com.google.common.base.Optional;
+import java.util.ArrayList;
+import java.util.List;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.StreamsBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.DomainName;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.AvailableCapabilities;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.AvailableCapabilitiesBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+public final class NetconfTestUtils {
+
+ public static final String notification_capability_prefix = "(urn:ietf:params:xml:ns:netconf:notification";
+
+ private NetconfTestUtils() {
+ }
+
+ public static Node getNetconfNode(String nodeIdent, String hostName, ConnectionStatus cs,
+ String notificationCapabilityPrefix) {
+
+ DomainName dn = new DomainName(hostName);
+ Host host = new Host(dn);
+
+ List<String> avCapList = new ArrayList<>();
+ avCapList.add(notificationCapabilityPrefix + "_availableCapabilityString1");
+ AvailableCapabilities avCaps = new AvailableCapabilitiesBuilder().setAvailableCapability(avCapList).build();
+ NetconfNode nn = new NetconfNodeBuilder().setConnectionStatus(cs).setHost(host).setAvailableCapabilities(avCaps)
+ .build();
+
+ NodeId nodeId = new NodeId(nodeIdent);
+ NodeKey nk = new NodeKey(nodeId);
+ NodeBuilder nb = new NodeBuilder();
+ nb.setKey(nk);
+
+ nb.addAugmentation(NetconfNode.class, nn);
+ return nb.build();
+ }
+
+ public static Node getNode(String nodeIdent) {
+ NodeId nodeId = new NodeId(nodeIdent);
+ NodeKey nk = new NodeKey(nodeId);
+ NodeBuilder nb = new NodeBuilder();
+ nb.setKey(nk);
+ return nb.build();
+ }
+
+ public static InstanceIdentifier<Node> getInstanceIdentifier(Node node) {
+ TopologyKey NETCONF_TOPOLOGY_KEY = new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName()));
+ InstanceIdentifier<Node> nodeII = InstanceIdentifier.create(NetworkTopology.class)
+ .child(Topology.class, NETCONF_TOPOLOGY_KEY).child(Node.class, node.getKey());
+ return nodeII;
+ }
+
+ public static Optional<Streams> getAvailableStream(String Name, boolean replaySupport) {
+ Stream stream = new StreamBuilder().setName(new StreamNameType(Name)).setReplaySupport(replaySupport).build();
+ List<Stream> streamList = new ArrayList<>();
+ streamList.add(stream);
+ Streams streams = new StreamsBuilder().setStream(streamList).build();
+ return Optional.of(streams);
+ }
+
+}
<artifactId>sal-netconf-connector</artifactId>
<version>${mdsal.version}</version>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>messagebus-netconf</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>features-netconf-connector</artifactId>
<module>netconf-notifications-impl</module>
<module>netconf-notifications-api</module>
<module>sal-netconf-connector</module>
+ <module>messagebus-netconf</module>
<module>features</module>
<module>models</module>
<module>tools</module>