From: Marian Adamjak Date: Wed, 6 May 2015 10:46:05 +0000 (+0200) Subject: BUG 3030 - reconnect netconf event source X-Git-Tag: release/beryllium~534 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=6c9c88f85589d635e3742cb2557044bf3a006d29 BUG 3030 - reconnect netconf event source - replay notification - publish notification about connection status (cherry picked from commit 4c147b35d298b281afccb53c7fb8b83b1b96ddfc) Change-Id: Ia6ead39a2e1a81135dcd86163fb7adb40a3d7d5c Signed-off-by: Marian Adamjak --- diff --git a/opendaylight/md-sal/messagebus-api/src/main/yang/event-source.yang b/opendaylight/md-sal/messagebus-api/src/main/yang/event-source.yang index 5dd416cde6..c56243b3fa 100644 --- a/opendaylight/md-sal/messagebus-api/src/main/yang/event-source.yang +++ b/opendaylight/md-sal/messagebus-api/src/main/yang/event-source.yang @@ -43,6 +43,18 @@ module event-source { } } + typedef event-source-status { + type enumeration { + enum active; + enum inactive; + enum deactive; + } + description "Status of event source + - active: event source is publishing notification, + - inactive: event source stopped publishing of notifications temporarily + - deactive: event source stopped publishing of notifications permanently" ; + } + grouping topology-event-source-type { container topology-event-source { presence "indicates an event source-aware topology"; @@ -72,6 +84,19 @@ module event-source { } } + notification event-source-status-notification { + + description + "Notification of change event source status."; + + leaf status { + type event-source-status; + mandatory true; + description "Current status of event source."; + } + + } + augment "/nt:network-topology/nt:topology/nt:topology-types" { uses topology-event-source-type; } diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModule.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModule.java index dd68714c96..803e89a57e 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModule.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModule.java @@ -64,11 +64,16 @@ public class MessageBusAppImplModule extends org.opendaylight.controller.config. final DataBroker dataBroker = bindingCtx.getSALService(DataBroker.class); final DOMNotificationPublishService domPublish = domCtx.getService(DOMNotificationPublishService.class); final DOMMountPointService domMount = domCtx.getService(DOMMountPointService.class); - final MountPointService bindingMount = bindingCtx.getSALService(MountPointService.class); final RpcProviderRegistry rpcRegistry = bindingCtx.getSALService(RpcProviderRegistry.class); - + final MountPointService mountPointService = bindingCtx.getSALService(MountPointService.class); final EventSourceRegistryWrapper eventSourceRegistryWrapper = new EventSourceRegistryWrapper(new EventSourceTopology(dataBroker, rpcRegistry)); - final NetconfEventSourceManager netconfEventSourceManager = NetconfEventSourceManager.create(dataBroker, domPublish,domMount, bindingMount, eventSourceRegistryWrapper, getNamespaceToStream()); + final NetconfEventSourceManager netconfEventSourceManager + = NetconfEventSourceManager.create(dataBroker, + domPublish, + domMount, + mountPointService, + eventSourceRegistryWrapper, + getNamespaceToStream()); eventSourceRegistryWrapper.addAutoCloseable(netconfEventSourceManager); LOGGER.info("Messagebus initialized"); return eventSourceRegistryWrapper; diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopic.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopic.java index c60562d3d4..6de407f58b 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopic.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopic.java @@ -53,7 +53,7 @@ public class EventSourceTopic implements DataChangeListener { for (final Map.Entry, DataObject> changeEntry : event.getUpdatedData().entrySet()) { if (changeEntry.getValue() instanceof Node) { final Node node = (Node) changeEntry.getValue(); - if (nodeIdPattern.matcher(node.getId().getValue()).matches()) { + if (getNodeIdRegexPattern().matcher(node.getId().getValue()).matches()) { notifyNode(changeEntry.getKey()); } } @@ -85,4 +85,8 @@ public class EventSourceTopic implements DataChangeListener { return jti; } + public Pattern getNodeIdRegexPattern() { + return nodeIdPattern; + } + } diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java index 6140a78ba5..279528907c 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java @@ -129,7 +129,7 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR deleteData(OPERATIONAL, augmentPath); } - private void notifyExistingNodes(final Pattern nodeIdPatternRegex, final EventSourceTopic eventSourceTopic){ + private void notifyExistingNodes(final EventSourceTopic eventSourceTopic){ final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction(); @@ -140,12 +140,19 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR @Override public void onSuccess(Optional data) { if(data.isPresent()) { + LOG.info("Topology data are present..."); final List nodes = data.get().getNode(); + if(nodes != null){ + LOG.info("List of nodes is not null..."); + final Pattern nodeIdPatternRegex = eventSourceTopic.getNodeIdRegexPattern(); for (final Node node : nodes) { if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) { eventSourceTopic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey())); } } + } else { + LOG.info("List of nodes is NULL..."); + } } tx.close(); } @@ -168,12 +175,11 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR final NotificationPattern notificationPattern = new NotificationPattern(input.getNotificationPattern()); final String nodeIdPattern = input.getNodeIdPattern().getValue(); - final Pattern nodeIdPatternRegex = Pattern.compile(nodeIdPattern); final EventSourceTopic eventSourceTopic = new EventSourceTopic(notificationPattern, nodeIdPattern, eventSourceService); registerTopic(eventSourceTopic); - notifyExistingNodes(nodeIdPatternRegex, eventSourceTopic); + notifyExistingNodes(eventSourceTopic); final CreateTopicOutput cto = new CreateTopicOutputBuilder() .setTopicId(eventSourceTopic.getTopicId()) @@ -213,7 +219,9 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR insert(sourcePath); for(EventSourceTopic est : topicListenerRegistrations.keySet()){ - est.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey)); + if(est.getNodeIdRegexPattern().matcher(nodeKey.getNodeId().getValue()).matches()){ + est.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey)); + } } } diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/ConnectionNotificationTopicRegistration.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/ConnectionNotificationTopicRegistration.java new file mode 100644 index 0000000000..2cbac7b97c --- /dev/null +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/ConnectionNotificationTopicRegistration.java @@ -0,0 +1,190 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.messagebus.eventsources.netconf; + +import java.net.URI; +import java.util.ArrayList; +import java.util.concurrent.ConcurrentHashMap; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; +import javax.xml.transform.dom.DOMSource; + +import org.opendaylight.controller.md.sal.dom.api.DOMNotification; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceStatus; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceStatusNotification; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceStatusNotificationBuilder; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.AnyXmlNode; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.impl.schema.Builders; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.w3c.dom.Document; +import org.w3c.dom.Element; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; + +public class ConnectionNotificationTopicRegistration extends NotificationTopicRegistration { + + private static final Logger LOG = LoggerFactory.getLogger(ConnectionNotificationTopicRegistration.class); + + public static final SchemaPath EVENT_SOURCE_STATUS_PATH = SchemaPath.create(true, QName.create(EventSourceStatusNotification.QNAME, "event-source-status")); + private static final NodeIdentifier EVENT_SOURCE_STATUS_ARG = new NodeIdentifier(EventSourceStatusNotification.QNAME); + private static final String XMLNS_ATTRIBUTE_KEY = "xmlns"; + private static final String XMLNS_URI = "http://www.w3.org/2000/xmlns/"; + + private final DOMNotificationListener domNotificationListener; + private ConcurrentHashMap> notificationTopicMap = new ConcurrentHashMap<>(); + + public ConnectionNotificationTopicRegistration(String SourceName, DOMNotificationListener domNotificationListener) { + super(NotificationSourceType.ConnectionStatusChange, SourceName, EVENT_SOURCE_STATUS_PATH.getLastComponent().getNamespace().toString()); + this.domNotificationListener = Preconditions.checkNotNull(domNotificationListener); + LOG.info("Connection notification source has been initialized..."); + setActive(true); + setReplaySupported(false); + } + + @Override + public void close() throws Exception { + LOG.info("Connection notification - publish Deactive"); + publishNotification(EventSourceStatus.Deactive); + notificationTopicMap.clear(); + setActive(false); + } + + @Override + void activateNotificationSource() { + LOG.info("Connection notification - publish Active"); + publishNotification(EventSourceStatus.Active); + } + + @Override + void deActivateNotificationSource() { + LOG.info("Connection notification - publish Inactive"); + publishNotification(EventSourceStatus.Inactive); + } + + @Override + void reActivateNotificationSource() { + LOG.info("Connection notification - reactivate - publish active"); + publishNotification(EventSourceStatus.Active); + } + + @Override + boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId) { + if(validateNotifactionSchemaPath(notificationPath) == false){ + LOG.debug("Bad SchemaPath for notification try to register"); + return false; + } + ArrayList topicIds = getNotificationTopicIds(notificationPath); + if(topicIds == null){ + topicIds = new ArrayList<>(); + topicIds.add(topicId); + } else { + if(topicIds.contains(topicId) == false){ + topicIds.add(topicId); + } + } + notificationTopicMap.put(notificationPath, topicIds); + return true; + } + + @Override + ArrayList getNotificationTopicIds(SchemaPath notificationPath) { + return notificationTopicMap.get(notificationPath); + } + + @Override + void unRegisterNotificationTopic(TopicId topicId) { + // TODO: need code when EventAggregator.destroyTopic will be implemented + } + + private boolean validateNotifactionSchemaPath(SchemaPath notificationPath){ + if(notificationPath == null){ + return false; + } + URI notificationNameSpace = notificationPath.getLastComponent().getNamespace(); + return getNotificationUrnPrefix().startsWith(notificationNameSpace.toString()); + } + + private void publishNotification(EventSourceStatus eventSourceStatus){ + + final EventSourceStatusNotification notification = new EventSourceStatusNotificationBuilder() + .setStatus(eventSourceStatus) + .build(); + domNotificationListener.onNotification(createNotification(notification)); + } + + private DOMNotification createNotification(EventSourceStatusNotification notification){ + final ContainerNode cn = Builders.containerBuilder() + .withNodeIdentifier(EVENT_SOURCE_STATUS_ARG) + .withChild(encapsulate(notification)) + .build(); + DOMNotification dn = new DOMNotification() { + + @Override + public SchemaPath getType() { + return EVENT_SOURCE_STATUS_PATH; + } + + @Override + public ContainerNode getBody() { + return cn; + } + }; + return dn; + } + + private AnyXmlNode encapsulate(EventSourceStatusNotification notification){ + + DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance(); + DocumentBuilder docBuilder; + + try { + docBuilder = docFactory.newDocumentBuilder(); + } catch (ParserConfigurationException e) { + throw new IllegalStateException("Can not create XML DocumentBuilder"); + } + + Document doc = docBuilder.newDocument(); + + final Optional namespace = Optional.of(EVENT_SOURCE_STATUS_ARG.getNodeType().getNamespace().toString()); + final Element rootElement = createElement(doc , "EventSourceStatusNotification", namespace); + + final Element sourceElement = doc.createElement("status"); + sourceElement.appendChild(doc.createTextNode(notification.getStatus().name())); + rootElement.appendChild(sourceElement); + + + return Builders.anyXmlBuilder().withNodeIdentifier(EVENT_SOURCE_STATUS_ARG) + .withValue(new DOMSource(rootElement)) + .build(); + + } + + // Helper to create root XML element with correct namespace and attribute + private Element createElement(final Document document, final String qName, final Optional namespaceURI) { + if(namespaceURI.isPresent()) { + final Element element = document.createElementNS(namespaceURI.get(), qName); + String name = XMLNS_ATTRIBUTE_KEY; + if(element.getPrefix() != null) { + name += ":" + element.getPrefix(); + } + element.setAttributeNS(XMLNS_URI, name, namespaceURI.get()); + return element; + } + return document.createElement(qName); + } +} diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java index 3dbdc98ea5..a640064751 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java @@ -12,10 +12,11 @@ import static com.google.common.util.concurrent.Futures.immediateFuture; import java.io.IOException; import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.regex.Pattern; @@ -23,14 +24,17 @@ import javax.xml.stream.XMLStreamException; import javax.xml.transform.dom.DOMResult; import javax.xml.transform.dom.DOMSource; -import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; +import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.MountPoint; +import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMEvent; import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; import org.opendaylight.controller.md.sal.dom.api.DOMNotification; import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener; import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService; import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; import org.opendaylight.controller.messagebus.app.impl.TopicDOMNotification; import org.opendaylight.controller.messagebus.app.impl.Util; import org.opendaylight.controller.messagebus.spi.EventSource; @@ -43,12 +47,12 @@ import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.even import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutputBuilder; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicStatus; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey; -import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcResult; @@ -67,9 +71,11 @@ import org.w3c.dom.Document; import org.w3c.dom.Element; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import com.google.common.base.Throwables; +import com.google.common.util.concurrent.CheckedFuture; -public class NetconfEventSource implements EventSource, DOMNotificationListener, DataChangeListener { +public class NetconfEventSource implements EventSource, DOMNotificationListener { private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSource.class); @@ -77,36 +83,71 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener, private static final NodeIdentifier EVENT_SOURCE_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "node-id")); private static final NodeIdentifier TOPIC_ID_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "topic-id")); private static final NodeIdentifier PAYLOAD_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "payload")); - - private static final NodeIdentifier STREAM_QNAME = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"stream")); - private static final SchemaPath CREATE_SUBSCRIPTION = SchemaPath.create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription")); + private static final String ConnectionNotificationSourceName = "ConnectionNotificationSource"; private final String nodeId; private final Node node; private final DOMMountPoint netconfMount; + private final MountPoint mountPoint; private final DOMNotificationPublishService domPublish; - private final Map urnPrefixToStreamMap; - - private final ConcurrentHashMap streamNotifRegistrationMap = new ConcurrentHashMap<>(); + private final Map urnPrefixToStreamMap; // key = urnPrefix, value = StreamName + private final List notificationTopicRegistrationList = new ArrayList<>(); - public NetconfEventSource(final Node node, final Map streamMap, final DOMMountPoint netconfMount, final DOMNotificationPublishService publishService) { - this.netconfMount = netconfMount; - this.node = node; + public NetconfEventSource(final Node node, final Map streamMap, final DOMMountPoint netconfMount, final MountPoint mountPoint, final DOMNotificationPublishService publishService) { + this.netconfMount = Preconditions.checkNotNull(netconfMount); + this.mountPoint = Preconditions.checkNotNull(mountPoint); + this.node = Preconditions.checkNotNull(node); + this.urnPrefixToStreamMap = Preconditions.checkNotNull(streamMap); + this.domPublish = Preconditions.checkNotNull(publishService); this.nodeId = node.getNodeId().getValue(); - this.urnPrefixToStreamMap = streamMap; - this.domPublish = publishService; - this.initializeStreamNotifRegistrationMap(); - LOG.info("NetconfEventSource [{}] created.", nodeId); + this.initializeNotificationTopicRegistrationList(); + + LOG.info("NetconfEventSource [{}] created.", this.nodeId); } - private void initializeStreamNotifRegistrationMap(){ - for(String streamName : this.urnPrefixToStreamMap.values()){ - streamNotifRegistrationMap.put(streamName, new StreamNotificationTopicRegistration(streamName, this.nodeId, this.netconfMount, this)); + private void initializeNotificationTopicRegistrationList() { + notificationTopicRegistrationList.add(new ConnectionNotificationTopicRegistration(ConnectionNotificationSourceName, this)); + Optional> streamMap = getAvailableStreams(); + if(streamMap.isPresent()){ + for (String urnPrefix : this.urnPrefixToStreamMap.keySet()) { + final String streamName = this.urnPrefixToStreamMap.get(urnPrefix); + if(streamMap.get().containsKey(streamName)){ + notificationTopicRegistrationList.add(new StreamNotificationTopicRegistration(streamMap.get().get(streamName),urnPrefix, this)); + } + } } } + private Optional> getAvailableStreams(){ + + Map streamMap = null; + InstanceIdentifier pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build(); + Optional dataBroker = this.mountPoint.getService(DataBroker.class); + + if(dataBroker.isPresent()){ + + ReadOnlyTransaction tx = dataBroker.get().newReadOnlyTransaction(); + CheckedFuture, ReadFailedException> checkFeature = tx.read(LogicalDatastoreType.OPERATIONAL,pathStream); + + try { + Optional streams = checkFeature.checkedGet(); + if(streams.isPresent()){ + streamMap = new HashMap<>(); + for(Stream stream : streams.get().getStream()){ + streamMap.put(stream.getName().getValue(), stream); + } + } + } catch (ReadFailedException e) { + LOG.warn("Can not read streams for node {}",this.nodeId); + } + + } + + return Optional.fromNullable(streamMap); + } + @Override public Future> joinTopic(final JoinTopicInput input) { @@ -120,21 +161,18 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener, JoinTopicStatus joinTopicStatus = JoinTopicStatus.Down; if(notificationsToSubscribe != null && notificationsToSubscribe.isEmpty() == false){ - final Optional notifyService = netconfMount.getService(DOMNotificationService.class); + final Optional notifyService = getDOMMountPoint().getService(DOMNotificationService.class); if(notifyService.isPresent()){ int subscribedStreams = 0; for(SchemaPath schemaNotification : notificationsToSubscribe){ - final Optional streamName = resolveStream(schemaNotification.getLastComponent()); - if(streamName.isPresent()){ - LOG.info("Stream {} is activating, TopicId {}", streamName.get(), topicId.getValue() ); - StreamNotificationTopicRegistration streamReg = streamNotifRegistrationMap.get(streamName.get()); - streamReg.activateStream(); - for(SchemaPath notificationPath : notificationsToSubscribe){ - LOG.info("Notification listener is registering, Notification {}, TopicId {}", notificationPath, topicId.getValue() ); - streamReg.registerNotificationListenerTopic(notificationPath, topicId); - } - subscribedStreams = subscribedStreams + 1; - } + for(NotificationTopicRegistration reg : notificationTopicRegistrationList){ + LOG.info("Source of notification {} is activating, TopicId {}", reg.getSourceName(), topicId.getValue() ); + reg.activateNotificationSource(); + boolean regSuccess = reg.registerNotificationTopic(schemaNotification, topicId); + if(regSuccess){ + subscribedStreams = subscribedStreams +1; + } + } } if(subscribedStreams > 0){ joinTopicStatus = JoinTopicStatus.Up; @@ -147,34 +185,42 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener, } - private void resubscribeToActiveStreams() { - for (StreamNotificationTopicRegistration streamReg : streamNotifRegistrationMap.values()){ - streamReg.reActivateStream(); + public void reActivateStreams(){ + for (NotificationTopicRegistration reg : notificationTopicRegistrationList) { + LOG.info("Source of notification {} is reactivating on node {}", reg.getSourceName(), this.nodeId); + reg.reActivateNotificationSource(); } } - private Optional resolveStream(final QName qName) { - String streamName = null; - - for (final Map.Entry entry : urnPrefixToStreamMap.entrySet()) { - final String nameSpace = qName.getNamespace().toString(); - final String urnPrefix = entry.getKey(); - if( nameSpace.startsWith(urnPrefix) ) { - streamName = entry.getValue(); - break; - } + public void deActivateStreams(){ + for (NotificationTopicRegistration reg : notificationTopicRegistrationList) { + LOG.info("Source of notification {} is deactivating on node {}", reg.getSourceName(), this.nodeId); + reg.deActivateNotificationSource(); } - return Optional.fromNullable(streamName); } @Override public void onNotification(final DOMNotification notification) { + LOG.info("Notification {} has been arrived...",notification.getType()); SchemaPath notificationPath = notification.getType(); - LOG.info("Notification {} has come.",notification.getType()); - for(StreamNotificationTopicRegistration streamReg : streamNotifRegistrationMap.values()){ - for(TopicId topicId : streamReg.getNotificationTopicIds(notificationPath)){ - publishNotification(notification, topicId); - LOG.info("Notification {} has been published for TopicId {}",notification.getType(), topicId.getValue()); + Date notificationEventTime = null; + if(notification instanceof DOMEvent){ + notificationEventTime = ((DOMEvent) notification).getEventTime(); + } + for(NotificationTopicRegistration notifReg : notificationTopicRegistrationList){ + ArrayList topicIdsForNotification = notifReg.getNotificationTopicIds(notificationPath); + if(topicIdsForNotification != null && topicIdsForNotification.isEmpty() == false){ + + if(notifReg instanceof StreamNotificationTopicRegistration){ + StreamNotificationTopicRegistration streamReg = (StreamNotificationTopicRegistration)notifReg; + streamReg.setLastEventTime(notificationEventTime); + } + + for(TopicId topicId : topicIdsForNotification){ + publishNotification(notification, topicId); + LOG.info("Notification {} has been published for TopicId {}",notification.getType(), topicId.getValue()); + } + } } } @@ -183,7 +229,7 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener, final ContainerNode topicNotification = Builders.containerBuilder() .withNodeIdentifier(TOPIC_NOTIFICATION_ARG) .withChild(ImmutableNodes.leafNode(TOPIC_ID_ARG, topicId)) - .withChild(ImmutableNodes.leafNode(EVENT_SOURCE_ARG, nodeId)) + .withChild(ImmutableNodes.leafNode(EVENT_SOURCE_ARG, this.nodeId)) .withChild(encapsulate(notification)) .build(); try { @@ -201,7 +247,7 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener, final DOMResult result = new DOMResult(element); - final SchemaContext context = netconfMount.getSchemaContext(); + final SchemaContext context = getDOMMountPoint().getSchemaContext(); final SchemaPath schemaPath = body.getType(); try { NetconfMessageTransformUtil.writeNormalizedNode(body.getBody(), result, schemaPath, context); @@ -214,34 +260,6 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener, } } - @Override - public void onDataChanged(final AsyncDataChangeEvent, DataObject> change) { - boolean wasConnected = false; - boolean nowConnected = false; - - for (final Map.Entry, DataObject> changeEntry : change.getOriginalData().entrySet()) { - if ( isNetconfNode(changeEntry) ) { - final NetconfNode nn = (NetconfNode)changeEntry.getValue(); - wasConnected = nn.isConnected(); - } - } - - for (final Map.Entry, DataObject> changeEntry : change.getUpdatedData().entrySet()) { - if ( isNetconfNode(changeEntry) ) { - final NetconfNode nn = (NetconfNode)changeEntry.getValue(); - nowConnected = nn.isConnected(); - } - } - - if (wasConnected == false && nowConnected == true) { - resubscribeToActiveStreams(); - } - } - - private static boolean isNetconfNode(final Map.Entry, DataObject> changeEntry ) { - return NetconfNode.class.equals(changeEntry.getKey().getTargetType()); - } - private List getMatchingNotifications(NotificationPattern notificationPattern){ // FIXME: default language should already be regex final String regex = Util.wildcardToRegex(notificationPattern.getValue()); @@ -256,105 +274,46 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener, @Override public void close() throws Exception { - for(StreamNotificationTopicRegistration streamReg : streamNotifRegistrationMap.values()){ - streamReg.deactivateStream(); + for(NotificationTopicRegistration streamReg : notificationTopicRegistrationList){ + streamReg.close(); } } @Override public NodeKey getSourceNodeKey(){ - return node.getKey(); + return getNode().getKey(); } @Override public List getAvailableNotifications() { + + final List availNotifList = new ArrayList<>(); + // add Event Source Connection status notification + availNotifList.add(ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH); + // FIXME: use SchemaContextListener to get changes asynchronously - final Set availableNotifications = netconfMount.getSchemaContext().getNotifications(); - final List qNs = new ArrayList<>(availableNotifications.size()); + final Set availableNotifications = getDOMMountPoint().getSchemaContext().getNotifications(); + // add all known notifications from netconf device for (final NotificationDefinition nd : availableNotifications) { - qNs.add(nd.getPath()); + availNotifList.add(nd.getPath()); } - return qNs; + return availNotifList; } - private class StreamNotificationTopicRegistration{ - - final private String streamName; - final private DOMMountPoint netconfMount; - final private String nodeId; - final private NetconfEventSource notificationListener; - private boolean active; - - private ConcurrentHashMap> notificationRegistrationMap = new ConcurrentHashMap<>(); - private ConcurrentHashMap> notificationTopicMap = new ConcurrentHashMap<>(); - - public StreamNotificationTopicRegistration(final String streamName, final String nodeId, final DOMMountPoint netconfMount, NetconfEventSource notificationListener) { - this.streamName = streamName; - this.netconfMount = netconfMount; - this.nodeId = nodeId; - this.notificationListener = notificationListener; - this.active = false; - } - - public boolean isActive() { - return active; - } - - public void reActivateStream(){ - if(this.isActive()){ - LOG.info("Stream {} is reactivated active on node {}.", this.streamName, this.nodeId); - final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME)) - .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.streamName)) - .build(); - netconfMount.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input); - } - } - - public void activateStream() { - if(this.isActive() == false){ - LOG.info("Stream {} is not active on node {}. Will subscribe.", this.streamName, this.nodeId); - final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME)) - .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.streamName)) - .build(); - netconfMount.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input); - this.active = true; - } else { - LOG.info("Stream {} is now active on node {}", this.streamName, this.nodeId); - } - } - - public void deactivateStream() { - for(ListenerRegistration reg : notificationRegistrationMap.values()){ - reg.close(); - } - this.active = false; - } - - public String getStreamName() { - return streamName; - } + public Node getNode() { + return node; + } - public ArrayList getNotificationTopicIds(SchemaPath notificationPath){ - return notificationTopicMap.get(notificationPath); - } + DOMMountPoint getDOMMountPoint() { + return netconfMount; + } - public void registerNotificationListenerTopic(SchemaPath notificationPath, TopicId topicId){ - final Optional notifyService = netconfMount.getService(DOMNotificationService.class); - if(notificationPath != null && notifyService.isPresent()){ - ListenerRegistration registration = notifyService.get().registerNotificationListener(this.notificationListener,notificationPath); - notificationRegistrationMap.put(notificationPath, registration); - ArrayList topicIds = getNotificationTopicIds(notificationPath); - if(topicIds == null){ - topicIds = new ArrayList<>(); - topicIds.add(topicId); - } else { - if(topicIds.contains(topicId) == false){ - topicIds.add(topicId); - } - } - notificationTopicMap.put(notificationPath, topicIds); - } - } + MountPoint getMountPoint() { + return mountPoint; + } + NetconfNode getNetconfNode(){ + return node.getAugmentation(NetconfNode.class); } + } diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManager.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManager.java index dd64e77073..180d3d4214 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManager.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManager.java @@ -8,7 +8,6 @@ package org.opendaylight.controller.messagebus.eventsources.netconf; - import java.util.HashMap; import java.util.List; import java.util.Map; @@ -21,16 +20,12 @@ import org.opendaylight.controller.md.sal.binding.api.MountPointService; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService; import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService; -import org.opendaylight.controller.messagebus.spi.EventSourceRegistration; import org.opendaylight.controller.messagebus.spi.EventSourceRegistry; import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; -import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus; import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology; -import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey; @@ -38,12 +33,9 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology. import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; public final class NetconfEventSourceManager implements DataChangeListener, AutoCloseable { @@ -54,19 +46,11 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto .child(Topology.class, NETCONF_TOPOLOGY_KEY) .child(Node.class); - private static final YangInstanceIdentifier NETCONF_DEVICE_DOM_PATH = YangInstanceIdentifier.builder() - .node(NetworkTopology.QNAME) - .node(Topology.QNAME) - .nodeWithKey(Topology.QNAME, QName.create(Topology.QNAME, "topology-id"),TopologyNetconf.QNAME.getLocalName()) - .node(Node.QNAME) - .build(); - private static final QName NODE_ID_QNAME = QName.create(Node.QNAME,"node-id"); - private final Map streamMap; - private final ConcurrentHashMap, EventSourceRegistration> eventSourceRegistration = new ConcurrentHashMap<>(); + private final ConcurrentHashMap, NetconfEventSourceRegistration> registrationMap = new ConcurrentHashMap<>(); private final DOMNotificationPublishService publishService; private final DOMMountPointService domMounts; - private final MountPointService bindingMounts; + private final MountPointService mountPointService; private ListenerRegistration listenerRegistration; private final EventSourceRegistry eventSourceRegistry; @@ -78,7 +62,7 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto final List namespaceMapping){ final NetconfEventSourceManager eventSourceManager = - new NetconfEventSourceManager(domPublish, domMount, bindingMount, eventSourceRegistry, namespaceMapping); + new NetconfEventSourceManager(domPublish, domMount,bindingMount, eventSourceRegistry, namespaceMapping); eventSourceManager.initialize(dataBroker); @@ -99,7 +83,7 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto Preconditions.checkNotNull(namespaceMapping); this.streamMap = namespaceToStreamMapping(namespaceMapping); this.domMounts = domMount; - this.bindingMounts = bindingMount; + this.mountPointService = bindingMount; this.publishService = domPublish; this.eventSourceRegistry = eventSourceRegistry; } @@ -123,10 +107,10 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto @Override public void onDataChanged(final AsyncDataChangeEvent, DataObject> event) { - LOG.debug("[DataChangeEvent, DataObject>: {}]", event); + LOG.info("[DataChangeEvent, DataObject>: {}]", event); for (final Map.Entry, DataObject> changeEntry : event.getCreatedData().entrySet()) { if (changeEntry.getValue() instanceof Node) { - nodeUpdated(changeEntry.getKey(),(Node) changeEntry.getValue()); + nodeCreated(changeEntry.getKey(),(Node) changeEntry.getValue()); } } @@ -136,82 +120,94 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto } } - } + for(InstanceIdentifier removePath : event.getRemovedPaths()){ + DataObject removeObject = event.getOriginalData().get(removePath); + if(removeObject instanceof Node){ + nodeRemoved(removePath); + } + } - private void nodeUpdated(final InstanceIdentifier key, final Node node) { + } - // we listen on node tree, therefore we should rather throw IllegalStateException when node is null - if ( node == null ) { - throw new IllegalStateException("Node is null"); - } - if ( isNetconfNode(node) == false ) { - LOG.debug("OnDataChanged Event. Not a Netconf node."); + private void nodeCreated(final InstanceIdentifier key, final Node node){ + Preconditions.checkNotNull(key); + if(validateNode(node) == false){ + LOG.warn("NodeCreated event : Node [{}] is null or not valid.", key.toString()); return; } - if ( isEventSource(node) == false ) { - LOG.debug("OnDataChanged Event. Node an EventSource node."); - return; + LOG.info("Netconf event source [{}] is creating...", key.toString()); + NetconfEventSourceRegistration nesr = NetconfEventSourceRegistration.create(key, node, this); + if(nesr != null){ + NetconfEventSourceRegistration nesrOld = registrationMap.put(key, nesr); + if(nesrOld != null){ + nesrOld.close(); + } } - if(node.getAugmentation(NetconfNode.class).getConnectionStatus() != ConnectionStatus.Connected ) { + } + + private void nodeUpdated(final InstanceIdentifier key, final Node node){ + Preconditions.checkNotNull(key); + if(validateNode(node) == false){ + LOG.warn("NodeUpdated event : Node [{}] is null or not valid.", key.toString()); return; } - if(!eventSourceRegistration.containsKey(key)) { - createEventSource(key,node); + LOG.info("Netconf event source [{}] is updating...", key.toString()); + NetconfEventSourceRegistration nesr = registrationMap.get(key); + if(nesr != null){ + nesr.updateStatus(); + } else { + nodeCreated(key, node); } } - private void createEventSource(final InstanceIdentifier key, final Node node) { - final Optional netconfMount = domMounts.getMountPoint(domMountPath(node.getNodeId())); - - if(netconfMount.isPresent()) { - final NetconfEventSource netconfEventSource = - new NetconfEventSource(node, streamMap, netconfMount.get(), publishService); - final EventSourceRegistration registration = eventSourceRegistry.registerEventSource(netconfEventSource); - LOG.info("Event source {} has been registered",node.getNodeId().getValue()); - eventSourceRegistration.putIfAbsent(key, registration); - + private void nodeRemoved(final InstanceIdentifier key){ + Preconditions.checkNotNull(key); + LOG.info("Netconf event source [{}] is removing...", key.toString()); + NetconfEventSourceRegistration nesr = registrationMap.remove(key); + if(nesr != null){ + nesr.close(); } } - private YangInstanceIdentifier domMountPath(final NodeId nodeId) { - return YangInstanceIdentifier.builder(NETCONF_DEVICE_DOM_PATH).nodeWithKey(Node.QNAME, NODE_ID_QNAME, nodeId.getValue()).build(); + private boolean validateNode(final Node node){ + if(node == null){ + return false; + } + return isNetconfNode(node); } - private boolean isNetconfNode(final Node node) { - return node.getAugmentation(NetconfNode.class) != null ; + Map getStreamMap() { + return streamMap; } - private boolean isEventSource(final Node node) { + DOMNotificationPublishService getPublishService() { + return publishService; + } - final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class); - return isEventSource(netconfNode); + DOMMountPointService getDomMounts() { + return domMounts; + } + EventSourceRegistry getEventSourceRegistry() { + return eventSourceRegistry; } - private boolean isEventSource(final NetconfNode node) { - if (node.getAvailableCapabilities() == null) { - return false; - } - final List capabilities = node.getAvailableCapabilities().getAvailableCapability(); - if(capabilities == null) { - return false; - } - for (final String capability : node.getAvailableCapabilities().getAvailableCapability()) { - if(capability.startsWith("(urn:ietf:params:xml:ns:netconf:notification")) { - return true; - } - } + MountPointService getMountPointService() { + return mountPointService; + } - return false; + private boolean isNetconfNode(final Node node) { + return node.getAugmentation(NetconfNode.class) != null ; } @Override public void close() { - for(final EventSourceRegistration reg : eventSourceRegistration.values()){ + listenerRegistration.close(); + for(final NetconfEventSourceRegistration reg : registrationMap.values()){ reg.close(); } - listenerRegistration.close(); + registrationMap.clear(); } } \ No newline at end of file diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceRegistration.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceRegistration.java new file mode 100644 index 0000000000..891887e7f6 --- /dev/null +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceRegistration.java @@ -0,0 +1,196 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.messagebus.eventsources.netconf; + +import java.util.List; + +import org.opendaylight.controller.md.sal.binding.api.MountPoint; +import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; +import org.opendaylight.controller.messagebus.spi.EventSourceRegistration; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; + +/** + * Helper class to keep connection status of netconf node and event source registration object + * + */ +public class NetconfEventSourceRegistration implements AutoCloseable{ + + private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSourceRegistration.class); + private static final YangInstanceIdentifier NETCONF_DEVICE_DOM_PATH = YangInstanceIdentifier.builder() + .node(NetworkTopology.QNAME) + .node(Topology.QNAME) + .nodeWithKey(Topology.QNAME, QName.create(Topology.QNAME, "topology-id"),TopologyNetconf.QNAME.getLocalName()) + .node(Node.QNAME) + .build(); + private static final QName NODE_ID_QNAME = QName.create(Node.QNAME,"node-id"); + private static final String NotificationCapabilityPrefix = "(urn:ietf:params:xml:ns:netconf:notification"; + + private final Node node; + private final InstanceIdentifier instanceIdent; + private final NetconfEventSourceManager netconfEventSourceManager; + private ConnectionStatus currentNetconfConnStatus; + private EventSourceRegistration eventSourceRegistration; + + public static NetconfEventSourceRegistration create(final InstanceIdentifier instanceIdent, final Node node, + final NetconfEventSourceManager netconfEventSourceManager){ + Preconditions.checkNotNull(instanceIdent); + Preconditions.checkNotNull(node); + Preconditions.checkNotNull(netconfEventSourceManager); + if(isEventSource(node) == false){ + return null; + } + NetconfEventSourceRegistration nesr = new NetconfEventSourceRegistration(instanceIdent, node, netconfEventSourceManager); + nesr.updateStatus(); + LOG.info("NetconfEventSourceRegistration for node {} has been initialized...",node.getNodeId().getValue()); + return nesr; + } + + private static boolean isEventSource(final Node node) { + final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class); + if(netconfNode == null){ + return false; + } + if (netconfNode.getAvailableCapabilities() == null) { + return false; + } + final List capabilities = netconfNode.getAvailableCapabilities().getAvailableCapability(); + if(capabilities == null || capabilities.isEmpty()) { + return false; + } + for (final String capability : netconfNode.getAvailableCapabilities().getAvailableCapability()) { + if(capability.startsWith(NotificationCapabilityPrefix)) { + return true; + } + } + + return false; + } + + private NetconfEventSourceRegistration(final InstanceIdentifier instanceIdent, final Node node, final NetconfEventSourceManager netconfEventSourceManager) { + this.instanceIdent = instanceIdent; + this.node = node; + this.netconfEventSourceManager = netconfEventSourceManager; + this.eventSourceRegistration =null; + } + + public Node getNode() { + return node; + } + + Optional> getEventSourceRegistration() { + return Optional.fromNullable(eventSourceRegistration); + } + + NetconfNode getNetconfNode(){ + return node.getAugmentation(NetconfNode.class); + } + + void updateStatus(){ + ConnectionStatus netconfConnStatus = getNetconfNode().getConnectionStatus(); + LOG.info("Change status on node {}, new status is {}",this.node.getNodeId().getValue(),netconfConnStatus); + if(netconfConnStatus.equals(currentNetconfConnStatus)){ + return; + } + changeStatus(netconfConnStatus); + } + + private boolean checkConnectionStatusType(ConnectionStatus status){ + if( status == ConnectionStatus.Connected + || status == ConnectionStatus.Connecting + || status == ConnectionStatus.UnableToConnect){ + return true; + } + return false; + } + + private void changeStatus(ConnectionStatus newStatus){ + Preconditions.checkNotNull(newStatus); + if(checkConnectionStatusType(newStatus) == false){ + throw new IllegalStateException("Unknown new Netconf Connection Status"); + } + if(this.currentNetconfConnStatus == null){ + if (newStatus == ConnectionStatus.Connected){ + registrationEventSource(); + } + } else if (this.currentNetconfConnStatus == ConnectionStatus.Connecting){ + if (newStatus == ConnectionStatus.Connected){ + if(this.eventSourceRegistration == null){ + registrationEventSource(); + } else { + // reactivate stream on registered event source (invoke publish notification about connection) + this.eventSourceRegistration.getInstance().reActivateStreams(); + } + } + } else if (this.currentNetconfConnStatus == ConnectionStatus.Connected) { + + if(newStatus == ConnectionStatus.Connecting || newStatus == ConnectionStatus.UnableToConnect){ + // deactivate streams on registered event source (invoke publish notification about connection) + this.eventSourceRegistration.getInstance().deActivateStreams(); + } + } else if (this.currentNetconfConnStatus == ConnectionStatus.UnableToConnect){ + if(newStatus == ConnectionStatus.Connected){ + if(this.eventSourceRegistration == null){ + registrationEventSource(); + } else { + // reactivate stream on registered event source (invoke publish notification about connection) + this.eventSourceRegistration.getInstance().reActivateStreams(); + } + } + } else { + throw new IllegalStateException("Unknown current Netconf Connection Status"); + } + this.currentNetconfConnStatus = newStatus; + } + + private void registrationEventSource(){ + final Optional mountPoint = netconfEventSourceManager.getMountPointService().getMountPoint(instanceIdent); + final Optional domMountPoint = netconfEventSourceManager.getDomMounts().getMountPoint(domMountPath(node.getNodeId())); + EventSourceRegistration registration = null; + if(domMountPoint.isPresent() && mountPoint.isPresent()) { + final NetconfEventSource netconfEventSource = new NetconfEventSource( + node, + netconfEventSourceManager.getStreamMap(), + domMountPoint.get(), + mountPoint.get(), + netconfEventSourceManager.getPublishService()); + registration = netconfEventSourceManager.getEventSourceRegistry().registerEventSource(netconfEventSource); + LOG.info("Event source {} has been registered",node.getNodeId().getValue()); + } + this.eventSourceRegistration = registration; + } + + private YangInstanceIdentifier domMountPath(final NodeId nodeId) { + return YangInstanceIdentifier.builder(NETCONF_DEVICE_DOM_PATH).nodeWithKey(Node.QNAME, NODE_ID_QNAME, nodeId.getValue()).build(); + } + + private void closeEventSourceRegistration(){ + if(getEventSourceRegistration().isPresent()){ + getEventSourceRegistration().get().close(); + } + } + + @Override + public void close() { + closeEventSourceRegistration(); + } + +} diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NotificationTopicRegistration.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NotificationTopicRegistration.java new file mode 100644 index 0000000000..7812bd223d --- /dev/null +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NotificationTopicRegistration.java @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.messagebus.eventsources.netconf; + +import java.util.ArrayList; + +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; + + +public abstract class NotificationTopicRegistration implements AutoCloseable { + + public enum NotificationSourceType{ + NetconfDeviceStream, + ConnectionStatusChange; + } + + private boolean active; + private final NotificationSourceType notificationSourceType; + private final String sourceName; + private final String notificationUrnPrefix; + private boolean replaySupported; + + protected NotificationTopicRegistration(NotificationSourceType notificationSourceType, String sourceName, String notificationUrnPrefix) { + this.notificationSourceType = notificationSourceType; + this.sourceName = sourceName; + this.notificationUrnPrefix = notificationUrnPrefix; + this.active = false; + this.setReplaySupported(false); + } + + public boolean isActive() { + return active; + } + + protected void setActive(boolean active) { + this.active = active; + } + + public NotificationSourceType getNotificationSourceType() { + return notificationSourceType; + } + + public String getSourceName() { + return sourceName; + } + + public String getNotificationUrnPrefix() { + return notificationUrnPrefix; + } + + abstract void activateNotificationSource(); + + abstract void deActivateNotificationSource(); + + abstract void reActivateNotificationSource(); + + abstract boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId); + + abstract void unRegisterNotificationTopic(TopicId topicId); + + abstract ArrayList getNotificationTopicIds(SchemaPath notificationPath); + + public boolean isReplaySupported() { + return replaySupported; + } + + protected void setReplaySupported(boolean replaySupported) { + this.replaySupported = replaySupported; + } + +} diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java new file mode 100644 index 0000000000..e0d4fe21e1 --- /dev/null +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java @@ -0,0 +1,183 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.messagebus.eventsources.netconf; + +import java.util.ArrayList; +import java.util.Date; +import java.util.concurrent.ConcurrentHashMap; + +import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.impl.schema.Builders; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeAttrBuilder; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; + +public class StreamNotificationTopicRegistration extends NotificationTopicRegistration { + + private static final Logger LOG = LoggerFactory.getLogger(StreamNotificationTopicRegistration.class); + private static final NodeIdentifier STREAM_QNAME = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"stream")); + private static final SchemaPath CREATE_SUBSCRIPTION = SchemaPath.create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription")); + private static final NodeIdentifier START_TIME_SUBSCRIPTION = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"startTime")); + + final private DOMMountPoint domMountPoint; + final private String nodeId; + final private NetconfEventSource netconfEventSource; + final private Stream stream; + private Date lastEventTime; + + private ConcurrentHashMap> notificationRegistrationMap = new ConcurrentHashMap<>(); + private ConcurrentHashMap> notificationTopicMap = new ConcurrentHashMap<>(); + + public StreamNotificationTopicRegistration(final Stream stream, final String notificationPrefix, NetconfEventSource netconfEventSource) { + super(NotificationSourceType.NetconfDeviceStream, stream.getName().getValue(), notificationPrefix); + this.domMountPoint = netconfEventSource.getDOMMountPoint(); + this.nodeId = netconfEventSource.getNode().getNodeId().getValue().toString(); + this.netconfEventSource = netconfEventSource; + this.stream = stream; + this.lastEventTime= null; + setReplaySupported(this.stream.isReplaySupport()); + setActive(false); + } + + void activateNotificationSource() { + if(isActive() == false){ + LOG.info("Stream {} is not active on node {}. Will subscribe.", this.getStreamName(), this.nodeId); + final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME)) + .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName())) + .build(); + CheckedFuture csFuture = domMountPoint.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input); + try { + csFuture.checkedGet(); + setActive(true); + } catch (DOMRpcException e) { + LOG.warn("Can not subscribe stream {} on node {}", this.getSourceName(), this.nodeId); + setActive(false); + return; + } + } else { + LOG.info("Stream {} is now active on node {}", this.getStreamName(), this.nodeId); + } + } + + void reActivateNotificationSource(){ + if(isActive()){ + LOG.info("Stream {} is reactivating on node {}.", this.getStreamName(), this.nodeId); + DataContainerNodeAttrBuilder inputBuilder = + Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME)) + .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName())); + if(isReplaySupported() && this.getLastEventTime() != null){ + inputBuilder.withChild(ImmutableNodes.leafNode(START_TIME_SUBSCRIPTION, this.getLastEventTime())); + } + final ContainerNode input = inputBuilder.build(); + CheckedFuture csFuture = domMountPoint.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input); + try { + csFuture.checkedGet(); + setActive(true); + } catch (DOMRpcException e) { + LOG.warn("Can not resubscribe stream {} on node {}", this.getSourceName(), this.nodeId); + setActive(false); + return; + } + } + } + + @Override + void deActivateNotificationSource() { + // no operations need + } + + private void closeStream() { + if(isActive()){ + for(ListenerRegistration reg : notificationRegistrationMap.values()){ + reg.close(); + } + notificationRegistrationMap.clear(); + notificationTopicMap.clear(); + setActive(false); + } + } + + private String getStreamName() { + return getSourceName(); + } + + @Override + ArrayList getNotificationTopicIds(SchemaPath notificationPath){ + return notificationTopicMap.get(notificationPath); + } + + @Override + boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId){ + if(validateNotificationPath(notificationPath) == false){ + LOG.debug("Bad SchemaPath for notification try to register"); + return false; + } + final Optional notifyService = domMountPoint.getService(DOMNotificationService.class); + if(notifyService.isPresent() == false){ + LOG.debug("DOMNotificationService is not present"); + return false; + } + ListenerRegistration registration = notifyService.get().registerNotificationListener(this.netconfEventSource,notificationPath); + notificationRegistrationMap.put(notificationPath, registration); + ArrayList topicIds = getNotificationTopicIds(notificationPath); + if(topicIds == null){ + topicIds = new ArrayList<>(); + topicIds.add(topicId); + } else { + if(topicIds.contains(topicId) == false){ + topicIds.add(topicId); + } + } + notificationTopicMap.put(notificationPath, topicIds); + return true; + } + + private boolean validateNotificationPath(SchemaPath notificationPath){ + if(notificationPath == null){ + return false; + } + String nameSpace = notificationPath.getLastComponent().toString(); + return nameSpace.startsWith(getNotificationUrnPrefix()); + } + + Optional getLastEventTime() { + return Optional.fromNullable(lastEventTime); + } + + + void setLastEventTime(Date lastEventTime) { + this.lastEventTime = lastEventTime; + } + + @Override + public void close() throws Exception { + closeStream(); + } + + @Override + void unRegisterNotificationTopic(TopicId topicId) { + // TODO: use it when destroy topic will be implemented + } + +} diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceManagerTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManagerTest.java similarity index 62% rename from opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceManagerTest.java rename to opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManagerTest.java index 1d6b825c9f..d592a8041d 100644 --- a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceManagerTest.java +++ b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManagerTest.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.messagebus.app.impl; +package org.opendaylight.controller.messagebus.eventsources.netconf; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; @@ -19,32 +19,30 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutionException; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.opendaylight.controller.config.yang.messagebus.app.impl.NamespaceToStream; -import org.opendaylight.controller.md.sal.binding.api.BindingService; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.MountPoint; import org.opendaylight.controller.md.sal.binding.api.MountPointService; +import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService; import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService; -import org.opendaylight.controller.messagebus.eventsources.netconf.NetconfEventSourceManager; +import org.opendaylight.controller.messagebus.app.impl.EventSourceTopology; import org.opendaylight.controller.messagebus.spi.EventSource; +import org.opendaylight.controller.messagebus.spi.EventSourceRegistration; import org.opendaylight.controller.messagebus.spi.EventSourceRegistry; -import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry; import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService; -import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; -import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields; -import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.AvailableCapabilities; -import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.binding.DataObject; @@ -52,10 +50,10 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; public class NetconfEventSourceManagerTest { - private static final String notification_capability_prefix = "(urn:ietf:params:xml:ns:netconf:notification"; NetconfEventSourceManager netconfEventSourceManager; ListenerRegistration listenerRegistrationMock; DOMMountPointService domMountPointServiceMock; @@ -82,6 +80,37 @@ public class NetconfEventSourceManagerTest { listenerRegistrationMock = mock(ListenerRegistration.class); doReturn(listenerRegistrationMock).when(dataBrokerMock).registerDataChangeListener(eq(LogicalDatastoreType.OPERATIONAL), any(InstanceIdentifier.class), any(NetconfEventSourceManager.class), eq(AsyncDataBroker.DataChangeScope.SUBTREE)); + Optional optionalDomMountServiceMock = (Optional) mock(Optional.class); + doReturn(true).when(optionalDomMountServiceMock).isPresent(); + doReturn(optionalDomMountServiceMock).when(domMountPointServiceMock).getMountPoint((YangInstanceIdentifier)notNull()); + + DOMMountPoint domMountPointMock = mock(DOMMountPoint.class); + doReturn(domMountPointMock).when(optionalDomMountServiceMock).get(); + + + Optional optionalBindingMountMock = mock(Optional.class); + doReturn(true).when(optionalBindingMountMock).isPresent(); + + MountPoint mountPointMock = mock(MountPoint.class); + doReturn(optionalBindingMountMock).when(mountPointServiceMock).getMountPoint(any(InstanceIdentifier.class)); + doReturn(mountPointMock).when(optionalBindingMountMock).get(); + + Optional optionalMpDataBroker = mock(Optional.class); + DataBroker mpDataBroker = mock(DataBroker.class); + doReturn(optionalMpDataBroker).when(mountPointMock).getService(DataBroker.class); + doReturn(true).when(optionalMpDataBroker).isPresent(); + doReturn(mpDataBroker).when(optionalMpDataBroker).get(); + + ReadOnlyTransaction rtx = mock(ReadOnlyTransaction.class); + doReturn(rtx).when(mpDataBroker).newReadOnlyTransaction(); + CheckedFuture, ReadFailedException> checkFeature = (CheckedFuture, ReadFailedException>)mock(CheckedFuture.class); + InstanceIdentifier pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build(); + doReturn(checkFeature).when(rtx).read(LogicalDatastoreType.OPERATIONAL, pathStream); + Optional avStreams = NetconfTestUtils.getAvailableStream("stream01", true); + doReturn(avStreams).when(checkFeature).checkedGet(); + + EventSourceRegistration esrMock = mock(EventSourceRegistration.class); + netconfEventSourceManager = NetconfEventSourceManager.create(dataBrokerMock, domNotificationPublishServiceMock, @@ -92,85 +121,57 @@ public class NetconfEventSourceManagerTest { } @Test - public void onDataChangedCreateEventSourceTestByCreateEntry() throws InterruptedException, ExecutionException { - onDataChangedTestHelper(true,false,true,notification_capability_prefix); + public void onDataChangedCreateEventSourceTestByCreateEntry() throws Exception { + onDataChangedTestHelper(true,false,true,NetconfTestUtils.notification_capability_prefix); netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock); verify(eventSourceRegistry, times(1)).registerEventSource(any(EventSource.class)); } @Test - public void onDataChangedCreateEventSourceTestByUpdateEntry() throws InterruptedException, ExecutionException { - onDataChangedTestHelper(false,true,true, notification_capability_prefix); + public void onDataChangedCreateEventSourceTestByUpdateEntry() throws Exception { + onDataChangedTestHelper(false,true,true, NetconfTestUtils.notification_capability_prefix); netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock); verify(eventSourceRegistry, times(1)).registerEventSource(any(EventSource.class)); } @Test - public void onDataChangedCreateEventSourceTestNotNeconf() throws InterruptedException, ExecutionException { - onDataChangedTestHelper(false,true,false,notification_capability_prefix); + public void onDataChangedCreateEventSourceTestNotNeconf() throws Exception { + onDataChangedTestHelper(false,true,false,NetconfTestUtils.notification_capability_prefix); netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock); verify(eventSourceRegistry, times(0)).registerEventSource(any(EventSource.class)); } @Test - public void onDataChangedCreateEventSourceTestNotNotificationCapability() throws InterruptedException, ExecutionException { - onDataChangedTestHelper(false,true,true,"bad-prefix"); + public void onDataChangedCreateEventSourceTestNotNotificationCapability() throws Exception { + onDataChangedTestHelper(true,false,true,"bad-prefix"); netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock); verify(eventSourceRegistry, times(0)).registerEventSource(any(EventSource.class)); } - private void onDataChangedTestHelper(boolean create, boolean update, boolean isNetconf, String notificationCapabilityPrefix) throws InterruptedException, ExecutionException{ + private void onDataChangedTestHelper(boolean create, boolean update, boolean isNetconf, String notificationCapabilityPrefix) throws Exception{ asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class); Map mapCreate = new HashMap<>(); Map mapUpdate = new HashMap<>(); - InstanceIdentifier instanceIdentifierMock = mock(InstanceIdentifier.class); - Node dataObjectMock = mock(Node.class); - - if(create){ - mapCreate.put(instanceIdentifierMock, dataObjectMock); - } - if(update){ - mapUpdate.put(instanceIdentifierMock, dataObjectMock); - } + Node node01; + String nodeId = "Node01"; doReturn(mapCreate).when(asyncDataChangeEventMock).getCreatedData(); doReturn(mapUpdate).when(asyncDataChangeEventMock).getUpdatedData(); - NetconfNode netconfNodeMock = mock(NetconfNode.class); - AvailableCapabilities availableCapabilitiesMock = mock(AvailableCapabilities.class); + if(isNetconf){ - doReturn(netconfNodeMock).when(dataObjectMock).getAugmentation(NetconfNode.class); - doReturn(availableCapabilitiesMock).when(netconfNodeMock).getAvailableCapabilities(); - List availableCapabilityList = new ArrayList<>(); - availableCapabilityList.add(notificationCapabilityPrefix +"_availableCapabilityString1"); - doReturn(availableCapabilityList).when(availableCapabilitiesMock).getAvailableCapability(); - doReturn(NetconfNodeFields.ConnectionStatus.Connected).when(netconfNodeMock).getConnectionStatus(); + node01 = NetconfTestUtils.getNetconfNode(nodeId, "node01.test.local", ConnectionStatus.Connected, notificationCapabilityPrefix); + } else { - doReturn(null).when(dataObjectMock).getAugmentation(NetconfNode.class); + node01 = NetconfTestUtils.getNode(nodeId); } - Optional optionalMock = mock(Optional.class); - Optional optionalBindingMountMock = mock(Optional.class); - NodeId nodeId = new NodeId("nodeId1"); - doReturn(nodeId).when(dataObjectMock).getNodeId(); - doReturn(optionalMock).when(domMountPointServiceMock).getMountPoint((YangInstanceIdentifier)notNull()); - doReturn(optionalBindingMountMock).when(mountPointServiceMock).getMountPoint(any(InstanceIdentifier.class)); - doReturn(true).when(optionalMock).isPresent(); - doReturn(true).when(optionalBindingMountMock).isPresent(); - - DOMMountPoint domMountPointMock = mock(DOMMountPoint.class); - MountPoint mountPointMock = mock(MountPoint.class); - doReturn(domMountPointMock).when(optionalMock).get(); - doReturn(mountPointMock).when(optionalBindingMountMock).get(); - - RpcConsumerRegistry rpcConsumerRegistryMock = mock(RpcConsumerRegistry.class); - Optional onlyOptionalMock = (Optional) mock(Optional.class); - NotificationsService notificationsServiceMock = mock(NotificationsService.class); + if(create){ + mapCreate.put(NetconfTestUtils.getInstanceIdentifier(node01), node01); + } + if(update){ + mapUpdate.put(NetconfTestUtils.getInstanceIdentifier(node01), node01); + } - doReturn(onlyOptionalMock).when(mountPointMock).getService(RpcConsumerRegistry.class); - doReturn(rpcConsumerRegistryMock).when(onlyOptionalMock).get(); - doReturn(notificationsServiceMock).when(rpcConsumerRegistryMock).getRpcService(NotificationsService.class); - EventSourceRegistrationImpl esrMock = mock(EventSourceRegistrationImpl.class); - doReturn(esrMock).when(eventSourceRegistry).registerEventSource(any(EventSource.class)); } } \ No newline at end of file diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceTest.java similarity index 71% rename from opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceTest.java rename to opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceTest.java index 5e1a07062d..24fa57a7ab 100644 --- a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceTest.java +++ b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceTest.java @@ -5,15 +5,13 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.messagebus.app.impl; +package org.opendaylight.controller.messagebus.eventsources.netconf; + -//import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import java.net.URI; import java.util.HashMap; @@ -24,25 +22,25 @@ import java.util.Set; import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.md.sal.binding.api.BindingService; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; +import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.MountPoint; +import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService; import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; import org.opendaylight.controller.md.sal.dom.api.DOMService; -import org.opendaylight.controller.messagebus.eventsources.netconf.NetconfEventSource; import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; -import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus; import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; @@ -57,6 +55,7 @@ public class NetconfEventSourceTest { NetconfEventSource netconfEventSource; DOMMountPoint domMountPointMock; + MountPoint mountPointMock; JoinTopicInput joinTopicInputMock; @Before @@ -64,34 +63,34 @@ public class NetconfEventSourceTest { Map streamMap = new HashMap<>(); streamMap.put("uriStr1", "string2"); domMountPointMock = mock(DOMMountPoint.class); + mountPointMock = mock(MountPoint.class); DOMNotificationPublishService domNotificationPublishServiceMock = mock(DOMNotificationPublishService.class); - RpcConsumerRegistry rpcConsumerRegistryMock = mock(RpcConsumerRegistry.class); Optional onlyOptionalMock = (Optional) mock(Optional.class); NotificationsService notificationsServiceMock = mock(NotificationsService.class); - doReturn(notificationsServiceMock).when(rpcConsumerRegistryMock).getRpcService(NotificationsService.class); - org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node node - = mock(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node.class); - org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId nodeId - = new org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId("NodeId1"); - doReturn(nodeId).when(node).getNodeId(); - netconfEventSource = new NetconfEventSource(node, streamMap, domMountPointMock, domNotificationPublishServiceMock); - } - @Test - public void onDataChangedTest(){ - InstanceIdentifier brmIdent = InstanceIdentifier.create(Nodes.class) - .child(Node.class, new NodeKey(new NodeId("brm"))).augmentation(NetconfNode.class); - AsyncDataChangeEvent asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class); - NetconfNode dataObjectMock = mock(NetconfNode.class); - Map dataChangeMap = new HashMap<>(); - dataChangeMap.put(brmIdent, dataObjectMock); - doReturn(dataChangeMap).when(asyncDataChangeEventMock).getOriginalData(); - doReturn(dataChangeMap).when(asyncDataChangeEventMock).getUpdatedData(); - doReturn(true).when(dataObjectMock).isConnected(); - netconfEventSource.onDataChanged(asyncDataChangeEventMock); - verify(dataObjectMock, times(2)).isConnected(); + Optional optionalMpDataBroker = (Optional) mock(Optional.class); + DataBroker mpDataBroker = mock(DataBroker.class); + doReturn(optionalMpDataBroker).when(mountPointMock).getService(DataBroker.class); + doReturn(true).when(optionalMpDataBroker).isPresent(); + doReturn(mpDataBroker).when(optionalMpDataBroker).get(); + + ReadOnlyTransaction rtx = mock(ReadOnlyTransaction.class); + doReturn(rtx).when(mpDataBroker).newReadOnlyTransaction(); + CheckedFuture, ReadFailedException> checkFeature = (CheckedFuture, ReadFailedException>)mock(CheckedFuture.class); + InstanceIdentifier pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build(); + doReturn(checkFeature).when(rtx).read(LogicalDatastoreType.OPERATIONAL, pathStream); + Optional avStreams = NetconfTestUtils.getAvailableStream("stream01", true); + doReturn(avStreams).when(checkFeature).checkedGet(); + + netconfEventSource = new NetconfEventSource( + NetconfTestUtils.getNetconfNode("NodeId1", "node.test.local", ConnectionStatus.Connected, NetconfTestUtils.notification_capability_prefix), + streamMap, + domMountPointMock, + mountPointMock , + domNotificationPublishServiceMock); + } @Test @@ -131,10 +130,12 @@ public class NetconfEventSourceTest { Optional optionalMock = (Optional) mock(Optional.class); doReturn(optionalMock).when(domMountPointMock).getService(DOMRpcService.class); + doReturn(true).when(optionalMock).isPresent(); DOMRpcService domRpcServiceMock = mock(DOMRpcService.class); doReturn(domRpcServiceMock).when(optionalMock).get(); CheckedFuture checkedFutureMock = mock(CheckedFuture.class); doReturn(checkedFutureMock).when(domRpcServiceMock).invokeRpc(any(SchemaPath.class), any(ContainerNode.class)); + } } \ No newline at end of file diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfTestUtils.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfTestUtils.java new file mode 100644 index 0000000000..5cc9f60220 --- /dev/null +++ b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfTestUtils.java @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.messagebus.eventsources.netconf; + +import java.util.ArrayList; +import java.util.List; + +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.StreamsBuilder; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamBuilder; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.DomainName; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.AvailableCapabilities; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.AvailableCapabilitiesBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; + +import com.google.common.base.Optional; + +public final class NetconfTestUtils { + + public static final String notification_capability_prefix = "(urn:ietf:params:xml:ns:netconf:notification"; + + private NetconfTestUtils() { + } + + public static Node getNetconfNode(String nodeIdent,String hostName,ConnectionStatus cs, String notificationCapabilityPrefix){ + + DomainName dn = new DomainName(hostName); + Host host = new Host(dn); + + List avCapList = new ArrayList<>(); + avCapList.add(notificationCapabilityPrefix +"_availableCapabilityString1"); + AvailableCapabilities avCaps = new AvailableCapabilitiesBuilder().setAvailableCapability(avCapList).build(); + NetconfNode nn = new NetconfNodeBuilder() + .setConnectionStatus(cs) + .setHost(host) + .setAvailableCapabilities(avCaps) + .build(); + + NodeId nodeId = new NodeId(nodeIdent); + NodeKey nk = new NodeKey(nodeId); + NodeBuilder nb = new NodeBuilder(); + nb.setKey(nk); + + nb.addAugmentation(NetconfNode.class, nn); + return nb.build(); + } + + public static Node getNode(String nodeIdent){ + NodeId nodeId = new NodeId(nodeIdent); + NodeKey nk = new NodeKey(nodeId); + NodeBuilder nb = new NodeBuilder(); + nb.setKey(nk); + return nb.build(); + } + + public static InstanceIdentifier getInstanceIdentifier(Node node){ + TopologyKey NETCONF_TOPOLOGY_KEY = new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName())); + InstanceIdentifier nodeII = InstanceIdentifier.create(NetworkTopology.class) + .child(Topology.class, NETCONF_TOPOLOGY_KEY) + .child(Node.class, node.getKey()); + return nodeII; + } + + public static Optional getAvailableStream(String Name, boolean replaySupport){ + Stream stream = new StreamBuilder() + .setName(new StreamNameType(Name)) + .setReplaySupport(replaySupport) + .build(); + List streamList = new ArrayList<>(); + streamList.add(stream); + Streams streams = new StreamsBuilder().setStream(streamList).build(); + return Optional.of(streams); + } + +}