}
}
+ typedef event-source-status {
+ type enumeration {
+ enum active;
+ enum inactive;
+ enum deactive;
+ }
+ description "Status of event source
+ - active: event source is publishing notification,
+ - inactive: event source stopped publishing of notifications temporarily
+ - deactive: event source stopped publishing of notifications permanently" ;
+ }
+
grouping topology-event-source-type {
container topology-event-source {
presence "indicates an event source-aware topology";
}
}
+ notification event-source-status-notification {
+
+ description
+ "Notification of change event source status.";
+
+ leaf status {
+ type event-source-status;
+ mandatory true;
+ description "Current status of event source.";
+ }
+
+ }
+
augment "/nt:network-topology/nt:topology/nt:topology-types" {
uses topology-event-source-type;
}
final DataBroker dataBroker = bindingCtx.getSALService(DataBroker.class);
final DOMNotificationPublishService domPublish = domCtx.getService(DOMNotificationPublishService.class);
final DOMMountPointService domMount = domCtx.getService(DOMMountPointService.class);
- final MountPointService bindingMount = bindingCtx.getSALService(MountPointService.class);
final RpcProviderRegistry rpcRegistry = bindingCtx.getSALService(RpcProviderRegistry.class);
-
+ final MountPointService mountPointService = bindingCtx.getSALService(MountPointService.class);
final EventSourceRegistryWrapper eventSourceRegistryWrapper = new EventSourceRegistryWrapper(new EventSourceTopology(dataBroker, rpcRegistry));
- final NetconfEventSourceManager netconfEventSourceManager = NetconfEventSourceManager.create(dataBroker, domPublish,domMount, bindingMount, eventSourceRegistryWrapper, getNamespaceToStream());
+ final NetconfEventSourceManager netconfEventSourceManager
+ = NetconfEventSourceManager.create(dataBroker,
+ domPublish,
+ domMount,
+ mountPointService,
+ eventSourceRegistryWrapper,
+ getNamespaceToStream());
eventSourceRegistryWrapper.addAutoCloseable(netconfEventSourceManager);
LOGGER.info("Messagebus initialized");
return eventSourceRegistryWrapper;
for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getUpdatedData().entrySet()) {
if (changeEntry.getValue() instanceof Node) {
final Node node = (Node) changeEntry.getValue();
- if (nodeIdPattern.matcher(node.getId().getValue()).matches()) {
+ if (getNodeIdRegexPattern().matcher(node.getId().getValue()).matches()) {
notifyNode(changeEntry.getKey());
}
}
return jti;
}
+ public Pattern getNodeIdRegexPattern() {
+ return nodeIdPattern;
+ }
+
}
deleteData(OPERATIONAL, augmentPath);
}
- private void notifyExistingNodes(final Pattern nodeIdPatternRegex, final EventSourceTopic eventSourceTopic){
+ private void notifyExistingNodes(final EventSourceTopic eventSourceTopic){
final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();
@Override
public void onSuccess(Optional<Topology> data) {
if(data.isPresent()) {
+ LOG.info("Topology data are present...");
final List<Node> nodes = data.get().getNode();
+ if(nodes != null){
+ LOG.info("List of nodes is not null...");
+ final Pattern nodeIdPatternRegex = eventSourceTopic.getNodeIdRegexPattern();
for (final Node node : nodes) {
if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) {
eventSourceTopic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()));
}
}
+ } else {
+ LOG.info("List of nodes is NULL...");
+ }
}
tx.close();
}
final NotificationPattern notificationPattern = new NotificationPattern(input.getNotificationPattern());
final String nodeIdPattern = input.getNodeIdPattern().getValue();
- final Pattern nodeIdPatternRegex = Pattern.compile(nodeIdPattern);
final EventSourceTopic eventSourceTopic = new EventSourceTopic(notificationPattern, nodeIdPattern, eventSourceService);
registerTopic(eventSourceTopic);
- notifyExistingNodes(nodeIdPatternRegex, eventSourceTopic);
+ notifyExistingNodes(eventSourceTopic);
final CreateTopicOutput cto = new CreateTopicOutputBuilder()
.setTopicId(eventSourceTopic.getTopicId())
insert(sourcePath);
for(EventSourceTopic est : topicListenerRegistrations.keySet()){
- est.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey));
+ if(est.getNodeIdRegexPattern().matcher(nodeKey.getNodeId().getValue()).matches()){
+ est.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey));
+ }
}
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.eventsources.netconf;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.dom.DOMSource;
+
+import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceStatus;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceStatusNotification;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceStatusNotificationBuilder;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.AnyXmlNode;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+public class ConnectionNotificationTopicRegistration extends NotificationTopicRegistration {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ConnectionNotificationTopicRegistration.class);
+
+ public static final SchemaPath EVENT_SOURCE_STATUS_PATH = SchemaPath.create(true, QName.create(EventSourceStatusNotification.QNAME, "event-source-status"));
+ private static final NodeIdentifier EVENT_SOURCE_STATUS_ARG = new NodeIdentifier(EventSourceStatusNotification.QNAME);
+ private static final String XMLNS_ATTRIBUTE_KEY = "xmlns";
+ private static final String XMLNS_URI = "http://www.w3.org/2000/xmlns/";
+
+ private final DOMNotificationListener domNotificationListener;
+ private ConcurrentHashMap<SchemaPath, ArrayList<TopicId>> notificationTopicMap = new ConcurrentHashMap<>();
+
+ public ConnectionNotificationTopicRegistration(String SourceName, DOMNotificationListener domNotificationListener) {
+ super(NotificationSourceType.ConnectionStatusChange, SourceName, EVENT_SOURCE_STATUS_PATH.getLastComponent().getNamespace().toString());
+ this.domNotificationListener = Preconditions.checkNotNull(domNotificationListener);
+ LOG.info("Connection notification source has been initialized...");
+ setActive(true);
+ setReplaySupported(false);
+ }
+
+ @Override
+ public void close() throws Exception {
+ LOG.info("Connection notification - publish Deactive");
+ publishNotification(EventSourceStatus.Deactive);
+ notificationTopicMap.clear();
+ setActive(false);
+ }
+
+ @Override
+ void activateNotificationSource() {
+ LOG.info("Connection notification - publish Active");
+ publishNotification(EventSourceStatus.Active);
+ }
+
+ @Override
+ void deActivateNotificationSource() {
+ LOG.info("Connection notification - publish Inactive");
+ publishNotification(EventSourceStatus.Inactive);
+ }
+
+ @Override
+ void reActivateNotificationSource() {
+ LOG.info("Connection notification - reactivate - publish active");
+ publishNotification(EventSourceStatus.Active);
+ }
+
+ @Override
+ boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId) {
+ if(validateNotifactionSchemaPath(notificationPath) == false){
+ LOG.debug("Bad SchemaPath for notification try to register");
+ return false;
+ }
+ ArrayList<TopicId> topicIds = getNotificationTopicIds(notificationPath);
+ if(topicIds == null){
+ topicIds = new ArrayList<>();
+ topicIds.add(topicId);
+ } else {
+ if(topicIds.contains(topicId) == false){
+ topicIds.add(topicId);
+ }
+ }
+ notificationTopicMap.put(notificationPath, topicIds);
+ return true;
+ }
+
+ @Override
+ ArrayList<TopicId> getNotificationTopicIds(SchemaPath notificationPath) {
+ return notificationTopicMap.get(notificationPath);
+ }
+
+ @Override
+ void unRegisterNotificationTopic(TopicId topicId) {
+ // TODO: need code when EventAggregator.destroyTopic will be implemented
+ }
+
+ private boolean validateNotifactionSchemaPath(SchemaPath notificationPath){
+ if(notificationPath == null){
+ return false;
+ }
+ URI notificationNameSpace = notificationPath.getLastComponent().getNamespace();
+ return getNotificationUrnPrefix().startsWith(notificationNameSpace.toString());
+ }
+
+ private void publishNotification(EventSourceStatus eventSourceStatus){
+
+ final EventSourceStatusNotification notification = new EventSourceStatusNotificationBuilder()
+ .setStatus(eventSourceStatus)
+ .build();
+ domNotificationListener.onNotification(createNotification(notification));
+ }
+
+ private DOMNotification createNotification(EventSourceStatusNotification notification){
+ final ContainerNode cn = Builders.containerBuilder()
+ .withNodeIdentifier(EVENT_SOURCE_STATUS_ARG)
+ .withChild(encapsulate(notification))
+ .build();
+ DOMNotification dn = new DOMNotification() {
+
+ @Override
+ public SchemaPath getType() {
+ return EVENT_SOURCE_STATUS_PATH;
+ }
+
+ @Override
+ public ContainerNode getBody() {
+ return cn;
+ }
+ };
+ return dn;
+ }
+
+ private AnyXmlNode encapsulate(EventSourceStatusNotification notification){
+
+ DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
+ DocumentBuilder docBuilder;
+
+ try {
+ docBuilder = docFactory.newDocumentBuilder();
+ } catch (ParserConfigurationException e) {
+ throw new IllegalStateException("Can not create XML DocumentBuilder");
+ }
+
+ Document doc = docBuilder.newDocument();
+
+ final Optional<String> namespace = Optional.of(EVENT_SOURCE_STATUS_ARG.getNodeType().getNamespace().toString());
+ final Element rootElement = createElement(doc , "EventSourceStatusNotification", namespace);
+
+ final Element sourceElement = doc.createElement("status");
+ sourceElement.appendChild(doc.createTextNode(notification.getStatus().name()));
+ rootElement.appendChild(sourceElement);
+
+
+ return Builders.anyXmlBuilder().withNodeIdentifier(EVENT_SOURCE_STATUS_ARG)
+ .withValue(new DOMSource(rootElement))
+ .build();
+
+ }
+
+ // Helper to create root XML element with correct namespace and attribute
+ private Element createElement(final Document document, final String qName, final Optional<String> namespaceURI) {
+ if(namespaceURI.isPresent()) {
+ final Element element = document.createElementNS(namespaceURI.get(), qName);
+ String name = XMLNS_ATTRIBUTE_KEY;
+ if(element.getPrefix() != null) {
+ name += ":" + element.getPrefix();
+ }
+ element.setAttributeNS(XMLNS_URI, name, namespaceURI.get());
+ return element;
+ }
+ return document.createElement(qName);
+ }
+}
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.regex.Pattern;
import javax.xml.transform.dom.DOMResult;
import javax.xml.transform.dom.DOMSource;
-import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.MountPoint;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMEvent;
import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.controller.messagebus.app.impl.TopicDOMNotification;
import org.opendaylight.controller.messagebus.app.impl.Util;
import org.opendaylight.controller.messagebus.spi.EventSource;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicStatus;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.w3c.dom.Element;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.CheckedFuture;
-public class NetconfEventSource implements EventSource, DOMNotificationListener, DataChangeListener {
+public class NetconfEventSource implements EventSource, DOMNotificationListener {
private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSource.class);
private static final NodeIdentifier EVENT_SOURCE_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "node-id"));
private static final NodeIdentifier TOPIC_ID_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "topic-id"));
private static final NodeIdentifier PAYLOAD_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "payload"));
-
- private static final NodeIdentifier STREAM_QNAME = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"stream"));
- private static final SchemaPath CREATE_SUBSCRIPTION = SchemaPath.create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription"));
+ private static final String ConnectionNotificationSourceName = "ConnectionNotificationSource";
private final String nodeId;
private final Node node;
private final DOMMountPoint netconfMount;
+ private final MountPoint mountPoint;
private final DOMNotificationPublishService domPublish;
- private final Map<String, String> urnPrefixToStreamMap;
-
- private final ConcurrentHashMap<String, StreamNotificationTopicRegistration> streamNotifRegistrationMap = new ConcurrentHashMap<>();
+ private final Map<String, String> urnPrefixToStreamMap; // key = urnPrefix, value = StreamName
+ private final List<NotificationTopicRegistration> notificationTopicRegistrationList = new ArrayList<>();
- public NetconfEventSource(final Node node, final Map<String, String> streamMap, final DOMMountPoint netconfMount, final DOMNotificationPublishService publishService) {
- this.netconfMount = netconfMount;
- this.node = node;
+ public NetconfEventSource(final Node node, final Map<String, String> streamMap, final DOMMountPoint netconfMount, final MountPoint mountPoint, final DOMNotificationPublishService publishService) {
+ this.netconfMount = Preconditions.checkNotNull(netconfMount);
+ this.mountPoint = Preconditions.checkNotNull(mountPoint);
+ this.node = Preconditions.checkNotNull(node);
+ this.urnPrefixToStreamMap = Preconditions.checkNotNull(streamMap);
+ this.domPublish = Preconditions.checkNotNull(publishService);
this.nodeId = node.getNodeId().getValue();
- this.urnPrefixToStreamMap = streamMap;
- this.domPublish = publishService;
- this.initializeStreamNotifRegistrationMap();
- LOG.info("NetconfEventSource [{}] created.", nodeId);
+ this.initializeNotificationTopicRegistrationList();
+
+ LOG.info("NetconfEventSource [{}] created.", this.nodeId);
}
- private void initializeStreamNotifRegistrationMap(){
- for(String streamName : this.urnPrefixToStreamMap.values()){
- streamNotifRegistrationMap.put(streamName, new StreamNotificationTopicRegistration(streamName, this.nodeId, this.netconfMount, this));
+ private void initializeNotificationTopicRegistrationList() {
+ notificationTopicRegistrationList.add(new ConnectionNotificationTopicRegistration(ConnectionNotificationSourceName, this));
+ Optional<Map<String, Stream>> streamMap = getAvailableStreams();
+ if(streamMap.isPresent()){
+ for (String urnPrefix : this.urnPrefixToStreamMap.keySet()) {
+ final String streamName = this.urnPrefixToStreamMap.get(urnPrefix);
+ if(streamMap.get().containsKey(streamName)){
+ notificationTopicRegistrationList.add(new StreamNotificationTopicRegistration(streamMap.get().get(streamName),urnPrefix, this));
+ }
+ }
}
}
+ private Optional<Map<String, Stream>> getAvailableStreams(){
+
+ Map<String,Stream> streamMap = null;
+ InstanceIdentifier<Streams> pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build();
+ Optional<DataBroker> dataBroker = this.mountPoint.getService(DataBroker.class);
+
+ if(dataBroker.isPresent()){
+
+ ReadOnlyTransaction tx = dataBroker.get().newReadOnlyTransaction();
+ CheckedFuture<Optional<Streams>, ReadFailedException> checkFeature = tx.read(LogicalDatastoreType.OPERATIONAL,pathStream);
+
+ try {
+ Optional<Streams> streams = checkFeature.checkedGet();
+ if(streams.isPresent()){
+ streamMap = new HashMap<>();
+ for(Stream stream : streams.get().getStream()){
+ streamMap.put(stream.getName().getValue(), stream);
+ }
+ }
+ } catch (ReadFailedException e) {
+ LOG.warn("Can not read streams for node {}",this.nodeId);
+ }
+
+ }
+
+ return Optional.fromNullable(streamMap);
+ }
+
@Override
public Future<RpcResult<JoinTopicOutput>> joinTopic(final JoinTopicInput input) {
JoinTopicStatus joinTopicStatus = JoinTopicStatus.Down;
if(notificationsToSubscribe != null && notificationsToSubscribe.isEmpty() == false){
- final Optional<DOMNotificationService> notifyService = netconfMount.getService(DOMNotificationService.class);
+ final Optional<DOMNotificationService> notifyService = getDOMMountPoint().getService(DOMNotificationService.class);
if(notifyService.isPresent()){
int subscribedStreams = 0;
for(SchemaPath schemaNotification : notificationsToSubscribe){
- final Optional<String> streamName = resolveStream(schemaNotification.getLastComponent());
- if(streamName.isPresent()){
- LOG.info("Stream {} is activating, TopicId {}", streamName.get(), topicId.getValue() );
- StreamNotificationTopicRegistration streamReg = streamNotifRegistrationMap.get(streamName.get());
- streamReg.activateStream();
- for(SchemaPath notificationPath : notificationsToSubscribe){
- LOG.info("Notification listener is registering, Notification {}, TopicId {}", notificationPath, topicId.getValue() );
- streamReg.registerNotificationListenerTopic(notificationPath, topicId);
- }
- subscribedStreams = subscribedStreams + 1;
- }
+ for(NotificationTopicRegistration reg : notificationTopicRegistrationList){
+ LOG.info("Source of notification {} is activating, TopicId {}", reg.getSourceName(), topicId.getValue() );
+ reg.activateNotificationSource();
+ boolean regSuccess = reg.registerNotificationTopic(schemaNotification, topicId);
+ if(regSuccess){
+ subscribedStreams = subscribedStreams +1;
+ }
+ }
}
if(subscribedStreams > 0){
joinTopicStatus = JoinTopicStatus.Up;
}
- private void resubscribeToActiveStreams() {
- for (StreamNotificationTopicRegistration streamReg : streamNotifRegistrationMap.values()){
- streamReg.reActivateStream();
+ public void reActivateStreams(){
+ for (NotificationTopicRegistration reg : notificationTopicRegistrationList) {
+ LOG.info("Source of notification {} is reactivating on node {}", reg.getSourceName(), this.nodeId);
+ reg.reActivateNotificationSource();
}
}
- private Optional<String> resolveStream(final QName qName) {
- String streamName = null;
-
- for (final Map.Entry<String, String> entry : urnPrefixToStreamMap.entrySet()) {
- final String nameSpace = qName.getNamespace().toString();
- final String urnPrefix = entry.getKey();
- if( nameSpace.startsWith(urnPrefix) ) {
- streamName = entry.getValue();
- break;
- }
+ public void deActivateStreams(){
+ for (NotificationTopicRegistration reg : notificationTopicRegistrationList) {
+ LOG.info("Source of notification {} is deactivating on node {}", reg.getSourceName(), this.nodeId);
+ reg.deActivateNotificationSource();
}
- return Optional.fromNullable(streamName);
}
@Override
public void onNotification(final DOMNotification notification) {
+ LOG.info("Notification {} has been arrived...",notification.getType());
SchemaPath notificationPath = notification.getType();
- LOG.info("Notification {} has come.",notification.getType());
- for(StreamNotificationTopicRegistration streamReg : streamNotifRegistrationMap.values()){
- for(TopicId topicId : streamReg.getNotificationTopicIds(notificationPath)){
- publishNotification(notification, topicId);
- LOG.info("Notification {} has been published for TopicId {}",notification.getType(), topicId.getValue());
+ Date notificationEventTime = null;
+ if(notification instanceof DOMEvent){
+ notificationEventTime = ((DOMEvent) notification).getEventTime();
+ }
+ for(NotificationTopicRegistration notifReg : notificationTopicRegistrationList){
+ ArrayList<TopicId> topicIdsForNotification = notifReg.getNotificationTopicIds(notificationPath);
+ if(topicIdsForNotification != null && topicIdsForNotification.isEmpty() == false){
+
+ if(notifReg instanceof StreamNotificationTopicRegistration){
+ StreamNotificationTopicRegistration streamReg = (StreamNotificationTopicRegistration)notifReg;
+ streamReg.setLastEventTime(notificationEventTime);
+ }
+
+ for(TopicId topicId : topicIdsForNotification){
+ publishNotification(notification, topicId);
+ LOG.info("Notification {} has been published for TopicId {}",notification.getType(), topicId.getValue());
+ }
+
}
}
}
final ContainerNode topicNotification = Builders.containerBuilder()
.withNodeIdentifier(TOPIC_NOTIFICATION_ARG)
.withChild(ImmutableNodes.leafNode(TOPIC_ID_ARG, topicId))
- .withChild(ImmutableNodes.leafNode(EVENT_SOURCE_ARG, nodeId))
+ .withChild(ImmutableNodes.leafNode(EVENT_SOURCE_ARG, this.nodeId))
.withChild(encapsulate(notification))
.build();
try {
final DOMResult result = new DOMResult(element);
- final SchemaContext context = netconfMount.getSchemaContext();
+ final SchemaContext context = getDOMMountPoint().getSchemaContext();
final SchemaPath schemaPath = body.getType();
try {
NetconfMessageTransformUtil.writeNormalizedNode(body.getBody(), result, schemaPath, context);
}
}
- @Override
- public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
- boolean wasConnected = false;
- boolean nowConnected = false;
-
- for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : change.getOriginalData().entrySet()) {
- if ( isNetconfNode(changeEntry) ) {
- final NetconfNode nn = (NetconfNode)changeEntry.getValue();
- wasConnected = nn.isConnected();
- }
- }
-
- for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : change.getUpdatedData().entrySet()) {
- if ( isNetconfNode(changeEntry) ) {
- final NetconfNode nn = (NetconfNode)changeEntry.getValue();
- nowConnected = nn.isConnected();
- }
- }
-
- if (wasConnected == false && nowConnected == true) {
- resubscribeToActiveStreams();
- }
- }
-
- private static boolean isNetconfNode(final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry ) {
- return NetconfNode.class.equals(changeEntry.getKey().getTargetType());
- }
-
private List<SchemaPath> getMatchingNotifications(NotificationPattern notificationPattern){
// FIXME: default language should already be regex
final String regex = Util.wildcardToRegex(notificationPattern.getValue());
@Override
public void close() throws Exception {
- for(StreamNotificationTopicRegistration streamReg : streamNotifRegistrationMap.values()){
- streamReg.deactivateStream();
+ for(NotificationTopicRegistration streamReg : notificationTopicRegistrationList){
+ streamReg.close();
}
}
@Override
public NodeKey getSourceNodeKey(){
- return node.getKey();
+ return getNode().getKey();
}
@Override
public List<SchemaPath> getAvailableNotifications() {
+
+ final List<SchemaPath> availNotifList = new ArrayList<>();
+ // add Event Source Connection status notification
+ availNotifList.add(ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH);
+
// FIXME: use SchemaContextListener to get changes asynchronously
- final Set<NotificationDefinition> availableNotifications = netconfMount.getSchemaContext().getNotifications();
- final List<SchemaPath> qNs = new ArrayList<>(availableNotifications.size());
+ final Set<NotificationDefinition> availableNotifications = getDOMMountPoint().getSchemaContext().getNotifications();
+ // add all known notifications from netconf device
for (final NotificationDefinition nd : availableNotifications) {
- qNs.add(nd.getPath());
+ availNotifList.add(nd.getPath());
}
- return qNs;
+ return availNotifList;
}
- private class StreamNotificationTopicRegistration{
-
- final private String streamName;
- final private DOMMountPoint netconfMount;
- final private String nodeId;
- final private NetconfEventSource notificationListener;
- private boolean active;
-
- private ConcurrentHashMap<SchemaPath, ListenerRegistration<NetconfEventSource>> notificationRegistrationMap = new ConcurrentHashMap<>();
- private ConcurrentHashMap<SchemaPath, ArrayList<TopicId>> notificationTopicMap = new ConcurrentHashMap<>();
-
- public StreamNotificationTopicRegistration(final String streamName, final String nodeId, final DOMMountPoint netconfMount, NetconfEventSource notificationListener) {
- this.streamName = streamName;
- this.netconfMount = netconfMount;
- this.nodeId = nodeId;
- this.notificationListener = notificationListener;
- this.active = false;
- }
-
- public boolean isActive() {
- return active;
- }
-
- public void reActivateStream(){
- if(this.isActive()){
- LOG.info("Stream {} is reactivated active on node {}.", this.streamName, this.nodeId);
- final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME))
- .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.streamName))
- .build();
- netconfMount.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input);
- }
- }
-
- public void activateStream() {
- if(this.isActive() == false){
- LOG.info("Stream {} is not active on node {}. Will subscribe.", this.streamName, this.nodeId);
- final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME))
- .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.streamName))
- .build();
- netconfMount.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input);
- this.active = true;
- } else {
- LOG.info("Stream {} is now active on node {}", this.streamName, this.nodeId);
- }
- }
-
- public void deactivateStream() {
- for(ListenerRegistration<NetconfEventSource> reg : notificationRegistrationMap.values()){
- reg.close();
- }
- this.active = false;
- }
-
- public String getStreamName() {
- return streamName;
- }
+ public Node getNode() {
+ return node;
+ }
- public ArrayList<TopicId> getNotificationTopicIds(SchemaPath notificationPath){
- return notificationTopicMap.get(notificationPath);
- }
+ DOMMountPoint getDOMMountPoint() {
+ return netconfMount;
+ }
- public void registerNotificationListenerTopic(SchemaPath notificationPath, TopicId topicId){
- final Optional<DOMNotificationService> notifyService = netconfMount.getService(DOMNotificationService.class);
- if(notificationPath != null && notifyService.isPresent()){
- ListenerRegistration<NetconfEventSource> registration = notifyService.get().registerNotificationListener(this.notificationListener,notificationPath);
- notificationRegistrationMap.put(notificationPath, registration);
- ArrayList<TopicId> topicIds = getNotificationTopicIds(notificationPath);
- if(topicIds == null){
- topicIds = new ArrayList<>();
- topicIds.add(topicId);
- } else {
- if(topicIds.contains(topicId) == false){
- topicIds.add(topicId);
- }
- }
- notificationTopicMap.put(notificationPath, topicIds);
- }
- }
+ MountPoint getMountPoint() {
+ return mountPoint;
+ }
+ NetconfNode getNetconfNode(){
+ return node.getAugmentation(NetconfNode.class);
}
+
}
package org.opendaylight.controller.messagebus.eventsources.netconf;
-
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
-import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
public final class NetconfEventSourceManager implements DataChangeListener, AutoCloseable {
.child(Topology.class, NETCONF_TOPOLOGY_KEY)
.child(Node.class);
- private static final YangInstanceIdentifier NETCONF_DEVICE_DOM_PATH = YangInstanceIdentifier.builder()
- .node(NetworkTopology.QNAME)
- .node(Topology.QNAME)
- .nodeWithKey(Topology.QNAME, QName.create(Topology.QNAME, "topology-id"),TopologyNetconf.QNAME.getLocalName())
- .node(Node.QNAME)
- .build();
- private static final QName NODE_ID_QNAME = QName.create(Node.QNAME,"node-id");
-
private final Map<String, String> streamMap;
- private final ConcurrentHashMap<InstanceIdentifier<?>, EventSourceRegistration<NetconfEventSource>> eventSourceRegistration = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<InstanceIdentifier<?>, NetconfEventSourceRegistration> registrationMap = new ConcurrentHashMap<>();
private final DOMNotificationPublishService publishService;
private final DOMMountPointService domMounts;
- private final MountPointService bindingMounts;
+ private final MountPointService mountPointService;
private ListenerRegistration<DataChangeListener> listenerRegistration;
private final EventSourceRegistry eventSourceRegistry;
final List<NamespaceToStream> namespaceMapping){
final NetconfEventSourceManager eventSourceManager =
- new NetconfEventSourceManager(domPublish, domMount, bindingMount, eventSourceRegistry, namespaceMapping);
+ new NetconfEventSourceManager(domPublish, domMount,bindingMount, eventSourceRegistry, namespaceMapping);
eventSourceManager.initialize(dataBroker);
Preconditions.checkNotNull(namespaceMapping);
this.streamMap = namespaceToStreamMapping(namespaceMapping);
this.domMounts = domMount;
- this.bindingMounts = bindingMount;
+ this.mountPointService = bindingMount;
this.publishService = domPublish;
this.eventSourceRegistry = eventSourceRegistry;
}
@Override
public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> event) {
- LOG.debug("[DataChangeEvent<InstanceIdentifier<?>, DataObject>: {}]", event);
+ LOG.info("[DataChangeEvent<InstanceIdentifier<?>, DataObject>: {}]", event);
for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getCreatedData().entrySet()) {
if (changeEntry.getValue() instanceof Node) {
- nodeUpdated(changeEntry.getKey(),(Node) changeEntry.getValue());
+ nodeCreated(changeEntry.getKey(),(Node) changeEntry.getValue());
}
}
}
}
- }
+ for(InstanceIdentifier<?> removePath : event.getRemovedPaths()){
+ DataObject removeObject = event.getOriginalData().get(removePath);
+ if(removeObject instanceof Node){
+ nodeRemoved(removePath);
+ }
+ }
- private void nodeUpdated(final InstanceIdentifier<?> key, final Node node) {
+ }
- // we listen on node tree, therefore we should rather throw IllegalStateException when node is null
- if ( node == null ) {
- throw new IllegalStateException("Node is null");
- }
- if ( isNetconfNode(node) == false ) {
- LOG.debug("OnDataChanged Event. Not a Netconf node.");
+ private void nodeCreated(final InstanceIdentifier<?> key, final Node node){
+ Preconditions.checkNotNull(key);
+ if(validateNode(node) == false){
+ LOG.warn("NodeCreated event : Node [{}] is null or not valid.", key.toString());
return;
}
- if ( isEventSource(node) == false ) {
- LOG.debug("OnDataChanged Event. Node an EventSource node.");
- return;
+ LOG.info("Netconf event source [{}] is creating...", key.toString());
+ NetconfEventSourceRegistration nesr = NetconfEventSourceRegistration.create(key, node, this);
+ if(nesr != null){
+ NetconfEventSourceRegistration nesrOld = registrationMap.put(key, nesr);
+ if(nesrOld != null){
+ nesrOld.close();
+ }
}
- if(node.getAugmentation(NetconfNode.class).getConnectionStatus() != ConnectionStatus.Connected ) {
+ }
+
+ private void nodeUpdated(final InstanceIdentifier<?> key, final Node node){
+ Preconditions.checkNotNull(key);
+ if(validateNode(node) == false){
+ LOG.warn("NodeUpdated event : Node [{}] is null or not valid.", key.toString());
return;
}
- if(!eventSourceRegistration.containsKey(key)) {
- createEventSource(key,node);
+ LOG.info("Netconf event source [{}] is updating...", key.toString());
+ NetconfEventSourceRegistration nesr = registrationMap.get(key);
+ if(nesr != null){
+ nesr.updateStatus();
+ } else {
+ nodeCreated(key, node);
}
}
- private void createEventSource(final InstanceIdentifier<?> key, final Node node) {
- final Optional<DOMMountPoint> netconfMount = domMounts.getMountPoint(domMountPath(node.getNodeId()));
-
- if(netconfMount.isPresent()) {
- final NetconfEventSource netconfEventSource =
- new NetconfEventSource(node, streamMap, netconfMount.get(), publishService);
- final EventSourceRegistration<NetconfEventSource> registration = eventSourceRegistry.registerEventSource(netconfEventSource);
- LOG.info("Event source {} has been registered",node.getNodeId().getValue());
- eventSourceRegistration.putIfAbsent(key, registration);
-
+ private void nodeRemoved(final InstanceIdentifier<?> key){
+ Preconditions.checkNotNull(key);
+ LOG.info("Netconf event source [{}] is removing...", key.toString());
+ NetconfEventSourceRegistration nesr = registrationMap.remove(key);
+ if(nesr != null){
+ nesr.close();
}
}
- private YangInstanceIdentifier domMountPath(final NodeId nodeId) {
- return YangInstanceIdentifier.builder(NETCONF_DEVICE_DOM_PATH).nodeWithKey(Node.QNAME, NODE_ID_QNAME, nodeId.getValue()).build();
+ private boolean validateNode(final Node node){
+ if(node == null){
+ return false;
+ }
+ return isNetconfNode(node);
}
- private boolean isNetconfNode(final Node node) {
- return node.getAugmentation(NetconfNode.class) != null ;
+ Map<String, String> getStreamMap() {
+ return streamMap;
}
- private boolean isEventSource(final Node node) {
+ DOMNotificationPublishService getPublishService() {
+ return publishService;
+ }
- final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
- return isEventSource(netconfNode);
+ DOMMountPointService getDomMounts() {
+ return domMounts;
+ }
+ EventSourceRegistry getEventSourceRegistry() {
+ return eventSourceRegistry;
}
- private boolean isEventSource(final NetconfNode node) {
- if (node.getAvailableCapabilities() == null) {
- return false;
- }
- final List<String> capabilities = node.getAvailableCapabilities().getAvailableCapability();
- if(capabilities == null) {
- return false;
- }
- for (final String capability : node.getAvailableCapabilities().getAvailableCapability()) {
- if(capability.startsWith("(urn:ietf:params:xml:ns:netconf:notification")) {
- return true;
- }
- }
+ MountPointService getMountPointService() {
+ return mountPointService;
+ }
- return false;
+ private boolean isNetconfNode(final Node node) {
+ return node.getAugmentation(NetconfNode.class) != null ;
}
@Override
public void close() {
- for(final EventSourceRegistration<NetconfEventSource> reg : eventSourceRegistration.values()){
+ listenerRegistration.close();
+ for(final NetconfEventSourceRegistration reg : registrationMap.values()){
reg.close();
}
- listenerRegistration.close();
+ registrationMap.clear();
}
}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.eventsources.netconf;
+
+import java.util.List;
+
+import org.opendaylight.controller.md.sal.binding.api.MountPoint;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+/**
+ * Helper class to keep connection status of netconf node and event source registration object
+ *
+ */
+public class NetconfEventSourceRegistration implements AutoCloseable{
+
+ private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSourceRegistration.class);
+ private static final YangInstanceIdentifier NETCONF_DEVICE_DOM_PATH = YangInstanceIdentifier.builder()
+ .node(NetworkTopology.QNAME)
+ .node(Topology.QNAME)
+ .nodeWithKey(Topology.QNAME, QName.create(Topology.QNAME, "topology-id"),TopologyNetconf.QNAME.getLocalName())
+ .node(Node.QNAME)
+ .build();
+ private static final QName NODE_ID_QNAME = QName.create(Node.QNAME,"node-id");
+ private static final String NotificationCapabilityPrefix = "(urn:ietf:params:xml:ns:netconf:notification";
+
+ private final Node node;
+ private final InstanceIdentifier<?> instanceIdent;
+ private final NetconfEventSourceManager netconfEventSourceManager;
+ private ConnectionStatus currentNetconfConnStatus;
+ private EventSourceRegistration<NetconfEventSource> eventSourceRegistration;
+
+ public static NetconfEventSourceRegistration create(final InstanceIdentifier<?> instanceIdent, final Node node,
+ final NetconfEventSourceManager netconfEventSourceManager){
+ Preconditions.checkNotNull(instanceIdent);
+ Preconditions.checkNotNull(node);
+ Preconditions.checkNotNull(netconfEventSourceManager);
+ if(isEventSource(node) == false){
+ return null;
+ }
+ NetconfEventSourceRegistration nesr = new NetconfEventSourceRegistration(instanceIdent, node, netconfEventSourceManager);
+ nesr.updateStatus();
+ LOG.info("NetconfEventSourceRegistration for node {} has been initialized...",node.getNodeId().getValue());
+ return nesr;
+ }
+
+ private static boolean isEventSource(final Node node) {
+ final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
+ if(netconfNode == null){
+ return false;
+ }
+ if (netconfNode.getAvailableCapabilities() == null) {
+ return false;
+ }
+ final List<String> capabilities = netconfNode.getAvailableCapabilities().getAvailableCapability();
+ if(capabilities == null || capabilities.isEmpty()) {
+ return false;
+ }
+ for (final String capability : netconfNode.getAvailableCapabilities().getAvailableCapability()) {
+ if(capability.startsWith(NotificationCapabilityPrefix)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private NetconfEventSourceRegistration(final InstanceIdentifier<?> instanceIdent, final Node node, final NetconfEventSourceManager netconfEventSourceManager) {
+ this.instanceIdent = instanceIdent;
+ this.node = node;
+ this.netconfEventSourceManager = netconfEventSourceManager;
+ this.eventSourceRegistration =null;
+ }
+
+ public Node getNode() {
+ return node;
+ }
+
+ Optional<EventSourceRegistration<NetconfEventSource>> getEventSourceRegistration() {
+ return Optional.fromNullable(eventSourceRegistration);
+ }
+
+ NetconfNode getNetconfNode(){
+ return node.getAugmentation(NetconfNode.class);
+ }
+
+ void updateStatus(){
+ ConnectionStatus netconfConnStatus = getNetconfNode().getConnectionStatus();
+ LOG.info("Change status on node {}, new status is {}",this.node.getNodeId().getValue(),netconfConnStatus);
+ if(netconfConnStatus.equals(currentNetconfConnStatus)){
+ return;
+ }
+ changeStatus(netconfConnStatus);
+ }
+
+ private boolean checkConnectionStatusType(ConnectionStatus status){
+ if( status == ConnectionStatus.Connected
+ || status == ConnectionStatus.Connecting
+ || status == ConnectionStatus.UnableToConnect){
+ return true;
+ }
+ return false;
+ }
+
+ private void changeStatus(ConnectionStatus newStatus){
+ Preconditions.checkNotNull(newStatus);
+ if(checkConnectionStatusType(newStatus) == false){
+ throw new IllegalStateException("Unknown new Netconf Connection Status");
+ }
+ if(this.currentNetconfConnStatus == null){
+ if (newStatus == ConnectionStatus.Connected){
+ registrationEventSource();
+ }
+ } else if (this.currentNetconfConnStatus == ConnectionStatus.Connecting){
+ if (newStatus == ConnectionStatus.Connected){
+ if(this.eventSourceRegistration == null){
+ registrationEventSource();
+ } else {
+ // reactivate stream on registered event source (invoke publish notification about connection)
+ this.eventSourceRegistration.getInstance().reActivateStreams();
+ }
+ }
+ } else if (this.currentNetconfConnStatus == ConnectionStatus.Connected) {
+
+ if(newStatus == ConnectionStatus.Connecting || newStatus == ConnectionStatus.UnableToConnect){
+ // deactivate streams on registered event source (invoke publish notification about connection)
+ this.eventSourceRegistration.getInstance().deActivateStreams();
+ }
+ } else if (this.currentNetconfConnStatus == ConnectionStatus.UnableToConnect){
+ if(newStatus == ConnectionStatus.Connected){
+ if(this.eventSourceRegistration == null){
+ registrationEventSource();
+ } else {
+ // reactivate stream on registered event source (invoke publish notification about connection)
+ this.eventSourceRegistration.getInstance().reActivateStreams();
+ }
+ }
+ } else {
+ throw new IllegalStateException("Unknown current Netconf Connection Status");
+ }
+ this.currentNetconfConnStatus = newStatus;
+ }
+
+ private void registrationEventSource(){
+ final Optional<MountPoint> mountPoint = netconfEventSourceManager.getMountPointService().getMountPoint(instanceIdent);
+ final Optional<DOMMountPoint> domMountPoint = netconfEventSourceManager.getDomMounts().getMountPoint(domMountPath(node.getNodeId()));
+ EventSourceRegistration<NetconfEventSource> registration = null;
+ if(domMountPoint.isPresent() && mountPoint.isPresent()) {
+ final NetconfEventSource netconfEventSource = new NetconfEventSource(
+ node,
+ netconfEventSourceManager.getStreamMap(),
+ domMountPoint.get(),
+ mountPoint.get(),
+ netconfEventSourceManager.getPublishService());
+ registration = netconfEventSourceManager.getEventSourceRegistry().registerEventSource(netconfEventSource);
+ LOG.info("Event source {} has been registered",node.getNodeId().getValue());
+ }
+ this.eventSourceRegistration = registration;
+ }
+
+ private YangInstanceIdentifier domMountPath(final NodeId nodeId) {
+ return YangInstanceIdentifier.builder(NETCONF_DEVICE_DOM_PATH).nodeWithKey(Node.QNAME, NODE_ID_QNAME, nodeId.getValue()).build();
+ }
+
+ private void closeEventSourceRegistration(){
+ if(getEventSourceRegistration().isPresent()){
+ getEventSourceRegistration().get().close();
+ }
+ }
+
+ @Override
+ public void close() {
+ closeEventSourceRegistration();
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.eventsources.netconf;
+
+import java.util.ArrayList;
+
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+
+public abstract class NotificationTopicRegistration implements AutoCloseable {
+
+ public enum NotificationSourceType{
+ NetconfDeviceStream,
+ ConnectionStatusChange;
+ }
+
+ private boolean active;
+ private final NotificationSourceType notificationSourceType;
+ private final String sourceName;
+ private final String notificationUrnPrefix;
+ private boolean replaySupported;
+
+ protected NotificationTopicRegistration(NotificationSourceType notificationSourceType, String sourceName, String notificationUrnPrefix) {
+ this.notificationSourceType = notificationSourceType;
+ this.sourceName = sourceName;
+ this.notificationUrnPrefix = notificationUrnPrefix;
+ this.active = false;
+ this.setReplaySupported(false);
+ }
+
+ public boolean isActive() {
+ return active;
+ }
+
+ protected void setActive(boolean active) {
+ this.active = active;
+ }
+
+ public NotificationSourceType getNotificationSourceType() {
+ return notificationSourceType;
+ }
+
+ public String getSourceName() {
+ return sourceName;
+ }
+
+ public String getNotificationUrnPrefix() {
+ return notificationUrnPrefix;
+ }
+
+ abstract void activateNotificationSource();
+
+ abstract void deActivateNotificationSource();
+
+ abstract void reActivateNotificationSource();
+
+ abstract boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId);
+
+ abstract void unRegisterNotificationTopic(TopicId topicId);
+
+ abstract ArrayList<TopicId> getNotificationTopicIds(SchemaPath notificationPath);
+
+ public boolean isReplaySupported() {
+ return replaySupported;
+ }
+
+ protected void setReplaySupported(boolean replaySupported) {
+ this.replaySupported = replaySupported;
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.eventsources.netconf;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeAttrBuilder;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+
+public class StreamNotificationTopicRegistration extends NotificationTopicRegistration {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StreamNotificationTopicRegistration.class);
+ private static final NodeIdentifier STREAM_QNAME = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"stream"));
+ private static final SchemaPath CREATE_SUBSCRIPTION = SchemaPath.create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription"));
+ private static final NodeIdentifier START_TIME_SUBSCRIPTION = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"startTime"));
+
+ final private DOMMountPoint domMountPoint;
+ final private String nodeId;
+ final private NetconfEventSource netconfEventSource;
+ final private Stream stream;
+ private Date lastEventTime;
+
+ private ConcurrentHashMap<SchemaPath, ListenerRegistration<NetconfEventSource>> notificationRegistrationMap = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<SchemaPath, ArrayList<TopicId>> notificationTopicMap = new ConcurrentHashMap<>();
+
+ public StreamNotificationTopicRegistration(final Stream stream, final String notificationPrefix, NetconfEventSource netconfEventSource) {
+ super(NotificationSourceType.NetconfDeviceStream, stream.getName().getValue(), notificationPrefix);
+ this.domMountPoint = netconfEventSource.getDOMMountPoint();
+ this.nodeId = netconfEventSource.getNode().getNodeId().getValue().toString();
+ this.netconfEventSource = netconfEventSource;
+ this.stream = stream;
+ this.lastEventTime= null;
+ setReplaySupported(this.stream.isReplaySupport());
+ setActive(false);
+ }
+
+ void activateNotificationSource() {
+ if(isActive() == false){
+ LOG.info("Stream {} is not active on node {}. Will subscribe.", this.getStreamName(), this.nodeId);
+ final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME))
+ .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName()))
+ .build();
+ CheckedFuture<DOMRpcResult, DOMRpcException> csFuture = domMountPoint.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input);
+ try {
+ csFuture.checkedGet();
+ setActive(true);
+ } catch (DOMRpcException e) {
+ LOG.warn("Can not subscribe stream {} on node {}", this.getSourceName(), this.nodeId);
+ setActive(false);
+ return;
+ }
+ } else {
+ LOG.info("Stream {} is now active on node {}", this.getStreamName(), this.nodeId);
+ }
+ }
+
+ void reActivateNotificationSource(){
+ if(isActive()){
+ LOG.info("Stream {} is reactivating on node {}.", this.getStreamName(), this.nodeId);
+ DataContainerNodeAttrBuilder<NodeIdentifier, ContainerNode> inputBuilder =
+ Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME))
+ .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName()));
+ if(isReplaySupported() && this.getLastEventTime() != null){
+ inputBuilder.withChild(ImmutableNodes.leafNode(START_TIME_SUBSCRIPTION, this.getLastEventTime()));
+ }
+ final ContainerNode input = inputBuilder.build();
+ CheckedFuture<DOMRpcResult, DOMRpcException> csFuture = domMountPoint.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input);
+ try {
+ csFuture.checkedGet();
+ setActive(true);
+ } catch (DOMRpcException e) {
+ LOG.warn("Can not resubscribe stream {} on node {}", this.getSourceName(), this.nodeId);
+ setActive(false);
+ return;
+ }
+ }
+ }
+
+ @Override
+ void deActivateNotificationSource() {
+ // no operations need
+ }
+
+ private void closeStream() {
+ if(isActive()){
+ for(ListenerRegistration<NetconfEventSource> reg : notificationRegistrationMap.values()){
+ reg.close();
+ }
+ notificationRegistrationMap.clear();
+ notificationTopicMap.clear();
+ setActive(false);
+ }
+ }
+
+ private String getStreamName() {
+ return getSourceName();
+ }
+
+ @Override
+ ArrayList<TopicId> getNotificationTopicIds(SchemaPath notificationPath){
+ return notificationTopicMap.get(notificationPath);
+ }
+
+ @Override
+ boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId){
+ if(validateNotificationPath(notificationPath) == false){
+ LOG.debug("Bad SchemaPath for notification try to register");
+ return false;
+ }
+ final Optional<DOMNotificationService> notifyService = domMountPoint.getService(DOMNotificationService.class);
+ if(notifyService.isPresent() == false){
+ LOG.debug("DOMNotificationService is not present");
+ return false;
+ }
+ ListenerRegistration<NetconfEventSource> registration = notifyService.get().registerNotificationListener(this.netconfEventSource,notificationPath);
+ notificationRegistrationMap.put(notificationPath, registration);
+ ArrayList<TopicId> topicIds = getNotificationTopicIds(notificationPath);
+ if(topicIds == null){
+ topicIds = new ArrayList<>();
+ topicIds.add(topicId);
+ } else {
+ if(topicIds.contains(topicId) == false){
+ topicIds.add(topicId);
+ }
+ }
+ notificationTopicMap.put(notificationPath, topicIds);
+ return true;
+ }
+
+ private boolean validateNotificationPath(SchemaPath notificationPath){
+ if(notificationPath == null){
+ return false;
+ }
+ String nameSpace = notificationPath.getLastComponent().toString();
+ return nameSpace.startsWith(getNotificationUrnPrefix());
+ }
+
+ Optional<Date> getLastEventTime() {
+ return Optional.fromNullable(lastEventTime);
+ }
+
+
+ void setLastEventTime(Date lastEventTime) {
+ this.lastEventTime = lastEventTime;
+ }
+
+ @Override
+ public void close() throws Exception {
+ closeStream();
+ }
+
+ @Override
+ void unRegisterNotificationTopic(TopicId topicId) {
+ // TODO: use it when destroy topic will be implemented
+ }
+
+}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.messagebus.app.impl;
+package org.opendaylight.controller.messagebus.eventsources.netconf;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.config.yang.messagebus.app.impl.NamespaceToStream;
-import org.opendaylight.controller.md.sal.binding.api.BindingService;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.MountPoint;
import org.opendaylight.controller.md.sal.binding.api.MountPointService;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
-import org.opendaylight.controller.messagebus.eventsources.netconf.NetconfEventSourceManager;
+import org.opendaylight.controller.messagebus.app.impl.EventSourceTopology;
import org.opendaylight.controller.messagebus.spi.EventSource;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
-import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.AvailableCapabilities;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
public class NetconfEventSourceManagerTest {
- private static final String notification_capability_prefix = "(urn:ietf:params:xml:ns:netconf:notification";
NetconfEventSourceManager netconfEventSourceManager;
ListenerRegistration listenerRegistrationMock;
DOMMountPointService domMountPointServiceMock;
listenerRegistrationMock = mock(ListenerRegistration.class);
doReturn(listenerRegistrationMock).when(dataBrokerMock).registerDataChangeListener(eq(LogicalDatastoreType.OPERATIONAL), any(InstanceIdentifier.class), any(NetconfEventSourceManager.class), eq(AsyncDataBroker.DataChangeScope.SUBTREE));
+ Optional<DOMMountPoint> optionalDomMountServiceMock = (Optional<DOMMountPoint>) mock(Optional.class);
+ doReturn(true).when(optionalDomMountServiceMock).isPresent();
+ doReturn(optionalDomMountServiceMock).when(domMountPointServiceMock).getMountPoint((YangInstanceIdentifier)notNull());
+
+ DOMMountPoint domMountPointMock = mock(DOMMountPoint.class);
+ doReturn(domMountPointMock).when(optionalDomMountServiceMock).get();
+
+
+ Optional optionalBindingMountMock = mock(Optional.class);
+ doReturn(true).when(optionalBindingMountMock).isPresent();
+
+ MountPoint mountPointMock = mock(MountPoint.class);
+ doReturn(optionalBindingMountMock).when(mountPointServiceMock).getMountPoint(any(InstanceIdentifier.class));
+ doReturn(mountPointMock).when(optionalBindingMountMock).get();
+
+ Optional optionalMpDataBroker = mock(Optional.class);
+ DataBroker mpDataBroker = mock(DataBroker.class);
+ doReturn(optionalMpDataBroker).when(mountPointMock).getService(DataBroker.class);
+ doReturn(true).when(optionalMpDataBroker).isPresent();
+ doReturn(mpDataBroker).when(optionalMpDataBroker).get();
+
+ ReadOnlyTransaction rtx = mock(ReadOnlyTransaction.class);
+ doReturn(rtx).when(mpDataBroker).newReadOnlyTransaction();
+ CheckedFuture<Optional<Streams>, ReadFailedException> checkFeature = (CheckedFuture<Optional<Streams>, ReadFailedException>)mock(CheckedFuture.class);
+ InstanceIdentifier<Streams> pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build();
+ doReturn(checkFeature).when(rtx).read(LogicalDatastoreType.OPERATIONAL, pathStream);
+ Optional<Streams> avStreams = NetconfTestUtils.getAvailableStream("stream01", true);
+ doReturn(avStreams).when(checkFeature).checkedGet();
+
+ EventSourceRegistration esrMock = mock(EventSourceRegistration.class);
+
netconfEventSourceManager =
NetconfEventSourceManager.create(dataBrokerMock,
domNotificationPublishServiceMock,
}
@Test
- public void onDataChangedCreateEventSourceTestByCreateEntry() throws InterruptedException, ExecutionException {
- onDataChangedTestHelper(true,false,true,notification_capability_prefix);
+ public void onDataChangedCreateEventSourceTestByCreateEntry() throws Exception {
+ onDataChangedTestHelper(true,false,true,NetconfTestUtils.notification_capability_prefix);
netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
verify(eventSourceRegistry, times(1)).registerEventSource(any(EventSource.class));
}
@Test
- public void onDataChangedCreateEventSourceTestByUpdateEntry() throws InterruptedException, ExecutionException {
- onDataChangedTestHelper(false,true,true, notification_capability_prefix);
+ public void onDataChangedCreateEventSourceTestByUpdateEntry() throws Exception {
+ onDataChangedTestHelper(false,true,true, NetconfTestUtils.notification_capability_prefix);
netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
verify(eventSourceRegistry, times(1)).registerEventSource(any(EventSource.class));
}
@Test
- public void onDataChangedCreateEventSourceTestNotNeconf() throws InterruptedException, ExecutionException {
- onDataChangedTestHelper(false,true,false,notification_capability_prefix);
+ public void onDataChangedCreateEventSourceTestNotNeconf() throws Exception {
+ onDataChangedTestHelper(false,true,false,NetconfTestUtils.notification_capability_prefix);
netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
verify(eventSourceRegistry, times(0)).registerEventSource(any(EventSource.class));
}
@Test
- public void onDataChangedCreateEventSourceTestNotNotificationCapability() throws InterruptedException, ExecutionException {
- onDataChangedTestHelper(false,true,true,"bad-prefix");
+ public void onDataChangedCreateEventSourceTestNotNotificationCapability() throws Exception {
+ onDataChangedTestHelper(true,false,true,"bad-prefix");
netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
verify(eventSourceRegistry, times(0)).registerEventSource(any(EventSource.class));
}
- private void onDataChangedTestHelper(boolean create, boolean update, boolean isNetconf, String notificationCapabilityPrefix) throws InterruptedException, ExecutionException{
+ private void onDataChangedTestHelper(boolean create, boolean update, boolean isNetconf, String notificationCapabilityPrefix) throws Exception{
asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class);
Map<InstanceIdentifier, DataObject> mapCreate = new HashMap<>();
Map<InstanceIdentifier, DataObject> mapUpdate = new HashMap<>();
- InstanceIdentifier instanceIdentifierMock = mock(InstanceIdentifier.class);
- Node dataObjectMock = mock(Node.class);
-
- if(create){
- mapCreate.put(instanceIdentifierMock, dataObjectMock);
- }
- if(update){
- mapUpdate.put(instanceIdentifierMock, dataObjectMock);
- }
+ Node node01;
+ String nodeId = "Node01";
doReturn(mapCreate).when(asyncDataChangeEventMock).getCreatedData();
doReturn(mapUpdate).when(asyncDataChangeEventMock).getUpdatedData();
- NetconfNode netconfNodeMock = mock(NetconfNode.class);
- AvailableCapabilities availableCapabilitiesMock = mock(AvailableCapabilities.class);
+
if(isNetconf){
- doReturn(netconfNodeMock).when(dataObjectMock).getAugmentation(NetconfNode.class);
- doReturn(availableCapabilitiesMock).when(netconfNodeMock).getAvailableCapabilities();
- List<String> availableCapabilityList = new ArrayList<>();
- availableCapabilityList.add(notificationCapabilityPrefix +"_availableCapabilityString1");
- doReturn(availableCapabilityList).when(availableCapabilitiesMock).getAvailableCapability();
- doReturn(NetconfNodeFields.ConnectionStatus.Connected).when(netconfNodeMock).getConnectionStatus();
+ node01 = NetconfTestUtils.getNetconfNode(nodeId, "node01.test.local", ConnectionStatus.Connected, notificationCapabilityPrefix);
+
} else {
- doReturn(null).when(dataObjectMock).getAugmentation(NetconfNode.class);
+ node01 = NetconfTestUtils.getNode(nodeId);
}
- Optional optionalMock = mock(Optional.class);
- Optional optionalBindingMountMock = mock(Optional.class);
- NodeId nodeId = new NodeId("nodeId1");
- doReturn(nodeId).when(dataObjectMock).getNodeId();
- doReturn(optionalMock).when(domMountPointServiceMock).getMountPoint((YangInstanceIdentifier)notNull());
- doReturn(optionalBindingMountMock).when(mountPointServiceMock).getMountPoint(any(InstanceIdentifier.class));
- doReturn(true).when(optionalMock).isPresent();
- doReturn(true).when(optionalBindingMountMock).isPresent();
-
- DOMMountPoint domMountPointMock = mock(DOMMountPoint.class);
- MountPoint mountPointMock = mock(MountPoint.class);
- doReturn(domMountPointMock).when(optionalMock).get();
- doReturn(mountPointMock).when(optionalBindingMountMock).get();
-
- RpcConsumerRegistry rpcConsumerRegistryMock = mock(RpcConsumerRegistry.class);
- Optional<BindingService> onlyOptionalMock = (Optional<BindingService>) mock(Optional.class);
- NotificationsService notificationsServiceMock = mock(NotificationsService.class);
+ if(create){
+ mapCreate.put(NetconfTestUtils.getInstanceIdentifier(node01), node01);
+ }
+ if(update){
+ mapUpdate.put(NetconfTestUtils.getInstanceIdentifier(node01), node01);
+ }
- doReturn(onlyOptionalMock).when(mountPointMock).getService(RpcConsumerRegistry.class);
- doReturn(rpcConsumerRegistryMock).when(onlyOptionalMock).get();
- doReturn(notificationsServiceMock).when(rpcConsumerRegistryMock).getRpcService(NotificationsService.class);
- EventSourceRegistrationImpl esrMock = mock(EventSourceRegistrationImpl.class);
- doReturn(esrMock).when(eventSourceRegistry).registerEventSource(any(EventSource.class));
}
}
\ No newline at end of file
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.messagebus.app.impl;
+package org.opendaylight.controller.messagebus.eventsources.netconf;
+
-//import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
import java.net.URI;
import java.util.HashMap;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.md.sal.binding.api.BindingService;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.MountPoint;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.controller.md.sal.dom.api.DOMService;
-import org.opendaylight.controller.messagebus.eventsources.netconf.NetconfEventSource;
import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
NetconfEventSource netconfEventSource;
DOMMountPoint domMountPointMock;
+ MountPoint mountPointMock;
JoinTopicInput joinTopicInputMock;
@Before
Map<String, String> streamMap = new HashMap<>();
streamMap.put("uriStr1", "string2");
domMountPointMock = mock(DOMMountPoint.class);
+ mountPointMock = mock(MountPoint.class);
DOMNotificationPublishService domNotificationPublishServiceMock = mock(DOMNotificationPublishService.class);
-
RpcConsumerRegistry rpcConsumerRegistryMock = mock(RpcConsumerRegistry.class);
Optional<BindingService> onlyOptionalMock = (Optional<BindingService>) mock(Optional.class);
NotificationsService notificationsServiceMock = mock(NotificationsService.class);
-
doReturn(notificationsServiceMock).when(rpcConsumerRegistryMock).getRpcService(NotificationsService.class);
- org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node node
- = mock(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node.class);
- org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId nodeId
- = new org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId("NodeId1");
- doReturn(nodeId).when(node).getNodeId();
- netconfEventSource = new NetconfEventSource(node, streamMap, domMountPointMock, domNotificationPublishServiceMock);
- }
- @Test
- public void onDataChangedTest(){
- InstanceIdentifier brmIdent = InstanceIdentifier.create(Nodes.class)
- .child(Node.class, new NodeKey(new NodeId("brm"))).augmentation(NetconfNode.class);
- AsyncDataChangeEvent asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class);
- NetconfNode dataObjectMock = mock(NetconfNode.class);
- Map<InstanceIdentifier, DataObject> dataChangeMap = new HashMap<>();
- dataChangeMap.put(brmIdent, dataObjectMock);
- doReturn(dataChangeMap).when(asyncDataChangeEventMock).getOriginalData();
- doReturn(dataChangeMap).when(asyncDataChangeEventMock).getUpdatedData();
- doReturn(true).when(dataObjectMock).isConnected();
- netconfEventSource.onDataChanged(asyncDataChangeEventMock);
- verify(dataObjectMock, times(2)).isConnected();
+ Optional<DataBroker> optionalMpDataBroker = (Optional<DataBroker>) mock(Optional.class);
+ DataBroker mpDataBroker = mock(DataBroker.class);
+ doReturn(optionalMpDataBroker).when(mountPointMock).getService(DataBroker.class);
+ doReturn(true).when(optionalMpDataBroker).isPresent();
+ doReturn(mpDataBroker).when(optionalMpDataBroker).get();
+
+ ReadOnlyTransaction rtx = mock(ReadOnlyTransaction.class);
+ doReturn(rtx).when(mpDataBroker).newReadOnlyTransaction();
+ CheckedFuture<Optional<Streams>, ReadFailedException> checkFeature = (CheckedFuture<Optional<Streams>, ReadFailedException>)mock(CheckedFuture.class);
+ InstanceIdentifier<Streams> pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build();
+ doReturn(checkFeature).when(rtx).read(LogicalDatastoreType.OPERATIONAL, pathStream);
+ Optional<Streams> avStreams = NetconfTestUtils.getAvailableStream("stream01", true);
+ doReturn(avStreams).when(checkFeature).checkedGet();
+
+ netconfEventSource = new NetconfEventSource(
+ NetconfTestUtils.getNetconfNode("NodeId1", "node.test.local", ConnectionStatus.Connected, NetconfTestUtils.notification_capability_prefix),
+ streamMap,
+ domMountPointMock,
+ mountPointMock ,
+ domNotificationPublishServiceMock);
+
}
@Test
Optional<DOMService> optionalMock = (Optional<DOMService>) mock(Optional.class);
doReturn(optionalMock).when(domMountPointMock).getService(DOMRpcService.class);
+ doReturn(true).when(optionalMock).isPresent();
DOMRpcService domRpcServiceMock = mock(DOMRpcService.class);
doReturn(domRpcServiceMock).when(optionalMock).get();
CheckedFuture checkedFutureMock = mock(CheckedFuture.class);
doReturn(checkedFutureMock).when(domRpcServiceMock).invokeRpc(any(SchemaPath.class), any(ContainerNode.class));
+
}
}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.eventsources.netconf;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.StreamsBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.DomainName;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.AvailableCapabilities;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.AvailableCapabilitiesBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+import com.google.common.base.Optional;
+
+public final class NetconfTestUtils {
+
+ public static final String notification_capability_prefix = "(urn:ietf:params:xml:ns:netconf:notification";
+
+ private NetconfTestUtils() {
+ }
+
+ public static Node getNetconfNode(String nodeIdent,String hostName,ConnectionStatus cs, String notificationCapabilityPrefix){
+
+ DomainName dn = new DomainName(hostName);
+ Host host = new Host(dn);
+
+ List<String> avCapList = new ArrayList<>();
+ avCapList.add(notificationCapabilityPrefix +"_availableCapabilityString1");
+ AvailableCapabilities avCaps = new AvailableCapabilitiesBuilder().setAvailableCapability(avCapList).build();
+ NetconfNode nn = new NetconfNodeBuilder()
+ .setConnectionStatus(cs)
+ .setHost(host)
+ .setAvailableCapabilities(avCaps)
+ .build();
+
+ NodeId nodeId = new NodeId(nodeIdent);
+ NodeKey nk = new NodeKey(nodeId);
+ NodeBuilder nb = new NodeBuilder();
+ nb.setKey(nk);
+
+ nb.addAugmentation(NetconfNode.class, nn);
+ return nb.build();
+ }
+
+ public static Node getNode(String nodeIdent){
+ NodeId nodeId = new NodeId(nodeIdent);
+ NodeKey nk = new NodeKey(nodeId);
+ NodeBuilder nb = new NodeBuilder();
+ nb.setKey(nk);
+ return nb.build();
+ }
+
+ public static InstanceIdentifier<Node> getInstanceIdentifier(Node node){
+ TopologyKey NETCONF_TOPOLOGY_KEY = new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName()));
+ InstanceIdentifier<Node> nodeII = InstanceIdentifier.create(NetworkTopology.class)
+ .child(Topology.class, NETCONF_TOPOLOGY_KEY)
+ .child(Node.class, node.getKey());
+ return nodeII;
+ }
+
+ public static Optional<Streams> getAvailableStream(String Name, boolean replaySupport){
+ Stream stream = new StreamBuilder()
+ .setName(new StreamNameType(Name))
+ .setReplaySupport(replaySupport)
+ .build();
+ List<Stream> streamList = new ArrayList<>();
+ streamList.add(stream);
+ Streams streams = new StreamsBuilder().setStream(streamList).build();
+ return Optional.of(streams);
+ }
+
+}