BUG 3030 - reconnect netconf event source 98/21798/2
authorMarian Adamjak <madamjak@cisco.com>
Wed, 6 May 2015 10:46:05 +0000 (12:46 +0200)
committerTony Tkacik <ttkacik@cisco.com>
Wed, 3 Jun 2015 16:42:46 +0000 (18:42 +0200)
   - replay notification
   - publish notification about connection status

(cherry picked from commit 4c147b35d298b281afccb53c7fb8b83b1b96ddfc)
Change-Id: Ia6ead39a2e1a81135dcd86163fb7adb40a3d7d5c
Signed-off-by: Marian Adamjak <madamjak@cisco.com>
13 files changed:
opendaylight/md-sal/messagebus-api/src/main/yang/event-source.yang
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModule.java
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopic.java
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/ConnectionNotificationTopicRegistration.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManager.java
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceRegistration.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NotificationTopicRegistration.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManagerTest.java [moved from opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceManagerTest.java with 62% similarity]
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceTest.java [moved from opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceTest.java with 71% similarity]
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfTestUtils.java [new file with mode: 0644]

index 5dd416cde61e89cc3bb200e327d09f8cd19ba2db..c56243b3fa9e7067ea934d6bcf37a6d4dd94b21f 100644 (file)
@@ -43,6 +43,18 @@ module event-source {
         }
     }
 
+    typedef event-source-status {
+        type enumeration {
+            enum active;
+            enum inactive;
+            enum deactive;
+        }
+        description "Status of event source
+                       - active: event source is publishing notification,
+                       - inactive: event source stopped publishing of notifications temporarily
+                       - deactive: event source stopped publishing of notifications permanently" ;
+    }
+
     grouping topology-event-source-type {
         container topology-event-source {
             presence "indicates an event source-aware topology";
@@ -72,6 +84,19 @@ module event-source {
         }
     }
 
+    notification event-source-status-notification {
+
+        description
+            "Notification of change event source status.";
+
+        leaf status {
+            type event-source-status;
+            mandatory true;
+            description "Current status of event source.";
+        }
+
+    }
+
     augment "/nt:network-topology/nt:topology/nt:topology-types" {
         uses topology-event-source-type;
     }
index dd68714c963490dcb3064a90335ae891a8a1a9d9..803e89a57eeb20e69d540afe8f42badbaf4ce858 100644 (file)
@@ -64,11 +64,16 @@ public class MessageBusAppImplModule extends org.opendaylight.controller.config.
         final DataBroker dataBroker = bindingCtx.getSALService(DataBroker.class);
         final DOMNotificationPublishService domPublish = domCtx.getService(DOMNotificationPublishService.class);
         final DOMMountPointService domMount = domCtx.getService(DOMMountPointService.class);
-        final MountPointService bindingMount = bindingCtx.getSALService(MountPointService.class);
         final RpcProviderRegistry rpcRegistry = bindingCtx.getSALService(RpcProviderRegistry.class);
-
+        final MountPointService mountPointService = bindingCtx.getSALService(MountPointService.class);
         final EventSourceRegistryWrapper eventSourceRegistryWrapper = new EventSourceRegistryWrapper(new EventSourceTopology(dataBroker, rpcRegistry));
-        final NetconfEventSourceManager netconfEventSourceManager = NetconfEventSourceManager.create(dataBroker, domPublish,domMount, bindingMount, eventSourceRegistryWrapper, getNamespaceToStream());
+        final NetconfEventSourceManager netconfEventSourceManager
+                = NetconfEventSourceManager.create(dataBroker,
+                        domPublish,
+                        domMount,
+                        mountPointService,
+                        eventSourceRegistryWrapper,
+                        getNamespaceToStream());
         eventSourceRegistryWrapper.addAutoCloseable(netconfEventSourceManager);
         LOGGER.info("Messagebus initialized");
         return eventSourceRegistryWrapper;
index c60562d3d4392dca7f5e1e3ca40da4e8bbe9961b..6de407f58be5d999961e82e03d572162ce2900d6 100644 (file)
@@ -53,7 +53,7 @@ public class EventSourceTopic implements DataChangeListener {
                 for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getUpdatedData().entrySet()) {
             if (changeEntry.getValue() instanceof Node) {
                 final Node node = (Node) changeEntry.getValue();
-                if (nodeIdPattern.matcher(node.getId().getValue()).matches()) {
+                if (getNodeIdRegexPattern().matcher(node.getId().getValue()).matches()) {
                     notifyNode(changeEntry.getKey());
                 }
             }
@@ -85,4 +85,8 @@ public class EventSourceTopic implements DataChangeListener {
         return jti;
     }
 
+    public Pattern getNodeIdRegexPattern() {
+        return nodeIdPattern;
+    }
+
 }
index 6140a78ba586a7beaa89a92ae0ab9cc5fb109016..279528907c049d3f5cc172dac57e53658331b95a 100644 (file)
@@ -129,7 +129,7 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR
         deleteData(OPERATIONAL, augmentPath);
     }
 
-    private void notifyExistingNodes(final Pattern nodeIdPatternRegex, final EventSourceTopic eventSourceTopic){
+    private void notifyExistingNodes(final EventSourceTopic eventSourceTopic){
 
         final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();
 
@@ -140,12 +140,19 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR
             @Override
             public void onSuccess(Optional<Topology> data) {
                 if(data.isPresent()) {
+                    LOG.info("Topology data are present...");
                      final List<Node> nodes = data.get().getNode();
+                     if(nodes != null){
+                         LOG.info("List of nodes is not null...");
+                         final Pattern nodeIdPatternRegex = eventSourceTopic.getNodeIdRegexPattern();
                      for (final Node node : nodes) {
                          if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) {
                              eventSourceTopic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()));
                          }
                      }
+                     } else {
+                         LOG.info("List of nodes is NULL...");
+                     }
                 }
                 tx.close();
             }
@@ -168,12 +175,11 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR
 
         final NotificationPattern notificationPattern = new NotificationPattern(input.getNotificationPattern());
         final String nodeIdPattern = input.getNodeIdPattern().getValue();
-        final Pattern nodeIdPatternRegex = Pattern.compile(nodeIdPattern);
         final EventSourceTopic eventSourceTopic = new EventSourceTopic(notificationPattern, nodeIdPattern, eventSourceService);
 
         registerTopic(eventSourceTopic);
 
-        notifyExistingNodes(nodeIdPatternRegex, eventSourceTopic);
+        notifyExistingNodes(eventSourceTopic);
 
         final CreateTopicOutput cto = new CreateTopicOutputBuilder()
                 .setTopicId(eventSourceTopic.getTopicId())
@@ -213,7 +219,9 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR
         insert(sourcePath);
 
         for(EventSourceTopic est : topicListenerRegistrations.keySet()){
-            est.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey));
+            if(est.getNodeIdRegexPattern().matcher(nodeKey.getNodeId().getValue()).matches()){
+                est.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey));
+            }
         }
     }
 
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/ConnectionNotificationTopicRegistration.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/ConnectionNotificationTopicRegistration.java
new file mode 100644 (file)
index 0000000..2cbac7b
--- /dev/null
@@ -0,0 +1,190 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.eventsources.netconf;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.dom.DOMSource;
+
+import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceStatus;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceStatusNotification;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceStatusNotificationBuilder;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.AnyXmlNode;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+public class ConnectionNotificationTopicRegistration extends NotificationTopicRegistration {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ConnectionNotificationTopicRegistration.class);
+
+    public static final SchemaPath EVENT_SOURCE_STATUS_PATH = SchemaPath.create(true, QName.create(EventSourceStatusNotification.QNAME, "event-source-status"));
+    private static final NodeIdentifier EVENT_SOURCE_STATUS_ARG = new NodeIdentifier(EventSourceStatusNotification.QNAME);
+    private static final String XMLNS_ATTRIBUTE_KEY = "xmlns";
+    private static final String XMLNS_URI = "http://www.w3.org/2000/xmlns/";
+
+    private final DOMNotificationListener domNotificationListener;
+    private ConcurrentHashMap<SchemaPath, ArrayList<TopicId>> notificationTopicMap = new ConcurrentHashMap<>();
+
+    public ConnectionNotificationTopicRegistration(String SourceName, DOMNotificationListener domNotificationListener) {
+        super(NotificationSourceType.ConnectionStatusChange, SourceName, EVENT_SOURCE_STATUS_PATH.getLastComponent().getNamespace().toString());
+        this.domNotificationListener = Preconditions.checkNotNull(domNotificationListener);
+        LOG.info("Connection notification source has been initialized...");
+        setActive(true);
+        setReplaySupported(false);
+    }
+
+    @Override
+    public void close() throws Exception {
+        LOG.info("Connection notification - publish Deactive");
+        publishNotification(EventSourceStatus.Deactive);
+        notificationTopicMap.clear();
+        setActive(false);
+    }
+
+    @Override
+    void activateNotificationSource() {
+        LOG.info("Connection notification - publish Active");
+        publishNotification(EventSourceStatus.Active);
+    }
+
+    @Override
+    void deActivateNotificationSource() {
+        LOG.info("Connection notification - publish Inactive");
+        publishNotification(EventSourceStatus.Inactive);
+    }
+
+    @Override
+    void reActivateNotificationSource() {
+        LOG.info("Connection notification - reactivate - publish active");
+        publishNotification(EventSourceStatus.Active);
+    }
+
+    @Override
+    boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId) {
+        if(validateNotifactionSchemaPath(notificationPath) == false){
+            LOG.debug("Bad SchemaPath for notification try to register");
+            return false;
+        }
+        ArrayList<TopicId> topicIds = getNotificationTopicIds(notificationPath);
+        if(topicIds == null){
+            topicIds = new ArrayList<>();
+            topicIds.add(topicId);
+        } else {
+            if(topicIds.contains(topicId) == false){
+                topicIds.add(topicId);
+            }
+        }
+        notificationTopicMap.put(notificationPath, topicIds);
+        return true;
+    }
+
+    @Override
+    ArrayList<TopicId> getNotificationTopicIds(SchemaPath notificationPath) {
+        return notificationTopicMap.get(notificationPath);
+    }
+
+    @Override
+    void unRegisterNotificationTopic(TopicId topicId) {
+        // TODO: need code when EventAggregator.destroyTopic will be implemented
+    }
+
+    private boolean validateNotifactionSchemaPath(SchemaPath notificationPath){
+        if(notificationPath == null){
+            return false;
+        }
+        URI notificationNameSpace = notificationPath.getLastComponent().getNamespace();
+        return getNotificationUrnPrefix().startsWith(notificationNameSpace.toString());
+    }
+
+    private void publishNotification(EventSourceStatus eventSourceStatus){
+
+        final EventSourceStatusNotification notification = new EventSourceStatusNotificationBuilder()
+                    .setStatus(eventSourceStatus)
+                    .build();
+        domNotificationListener.onNotification(createNotification(notification));
+    }
+
+    private DOMNotification createNotification(EventSourceStatusNotification notification){
+        final ContainerNode cn = Builders.containerBuilder()
+                .withNodeIdentifier(EVENT_SOURCE_STATUS_ARG)
+                .withChild(encapsulate(notification))
+                .build();
+        DOMNotification dn = new DOMNotification() {
+
+            @Override
+            public SchemaPath getType() {
+                return EVENT_SOURCE_STATUS_PATH;
+            }
+
+            @Override
+            public ContainerNode getBody() {
+                return cn;
+            }
+        };
+        return dn;
+    }
+
+    private AnyXmlNode encapsulate(EventSourceStatusNotification notification){
+
+        DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
+        DocumentBuilder docBuilder;
+
+        try {
+            docBuilder = docFactory.newDocumentBuilder();
+        } catch (ParserConfigurationException e) {
+            throw new IllegalStateException("Can not create XML DocumentBuilder");
+        }
+
+        Document doc = docBuilder.newDocument();
+
+        final Optional<String> namespace = Optional.of(EVENT_SOURCE_STATUS_ARG.getNodeType().getNamespace().toString());
+        final Element rootElement = createElement(doc , "EventSourceStatusNotification", namespace);
+
+        final Element sourceElement = doc.createElement("status");
+        sourceElement.appendChild(doc.createTextNode(notification.getStatus().name()));
+        rootElement.appendChild(sourceElement);
+
+
+        return Builders.anyXmlBuilder().withNodeIdentifier(EVENT_SOURCE_STATUS_ARG)
+                     .withValue(new DOMSource(rootElement))
+                     .build();
+
+    }
+
+    // Helper to create root XML element with correct namespace and attribute
+    private Element createElement(final Document document, final String qName, final Optional<String> namespaceURI) {
+        if(namespaceURI.isPresent()) {
+            final Element element = document.createElementNS(namespaceURI.get(), qName);
+            String name = XMLNS_ATTRIBUTE_KEY;
+            if(element.getPrefix() != null) {
+                name += ":" + element.getPrefix();
+            }
+            element.setAttributeNS(XMLNS_URI, name, namespaceURI.get());
+            return element;
+        }
+        return document.createElement(qName);
+    }
+}
index 3dbdc98ea52dd1c68fc7f15b2afcc955fd7887d2..a6400647514ebec952d2f344102fe42d7f44812c 100644 (file)
@@ -12,10 +12,11 @@ import static com.google.common.util.concurrent.Futures.immediateFuture;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
 import java.util.regex.Pattern;
 
@@ -23,14 +24,17 @@ import javax.xml.stream.XMLStreamException;
 import javax.xml.transform.dom.DOMResult;
 import javax.xml.transform.dom.DOMSource;
 
-import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.MountPoint;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMEvent;
 import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
 import org.opendaylight.controller.messagebus.app.impl.TopicDOMNotification;
 import org.opendaylight.controller.messagebus.app.impl.Util;
 import org.opendaylight.controller.messagebus.spi.EventSource;
@@ -43,12 +47,12 @@ import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.even
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicStatus;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcResult;
@@ -67,9 +71,11 @@ import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.CheckedFuture;
 
-public class NetconfEventSource implements EventSource, DOMNotificationListener, DataChangeListener {
+public class NetconfEventSource implements EventSource, DOMNotificationListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSource.class);
 
@@ -77,36 +83,71 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener,
     private static final NodeIdentifier EVENT_SOURCE_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "node-id"));
     private static final NodeIdentifier TOPIC_ID_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "topic-id"));
     private static final NodeIdentifier PAYLOAD_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "payload"));
-
-    private static final NodeIdentifier STREAM_QNAME = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"stream"));
-    private static final SchemaPath CREATE_SUBSCRIPTION = SchemaPath.create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription"));
+    private static final String ConnectionNotificationSourceName = "ConnectionNotificationSource";
 
     private final String nodeId;
     private final Node node;
 
     private final DOMMountPoint netconfMount;
+    private final MountPoint mountPoint;
     private final DOMNotificationPublishService domPublish;
 
-    private final Map<String, String> urnPrefixToStreamMap;
-
-    private final ConcurrentHashMap<String, StreamNotificationTopicRegistration> streamNotifRegistrationMap = new ConcurrentHashMap<>();
+    private final Map<String, String> urnPrefixToStreamMap; // key = urnPrefix, value = StreamName
+    private final List<NotificationTopicRegistration> notificationTopicRegistrationList = new ArrayList<>();
 
-    public NetconfEventSource(final Node node, final Map<String, String> streamMap, final DOMMountPoint netconfMount, final DOMNotificationPublishService publishService) {
-        this.netconfMount = netconfMount;
-        this.node = node;
+    public NetconfEventSource(final Node node, final Map<String, String> streamMap, final DOMMountPoint netconfMount, final MountPoint mountPoint, final DOMNotificationPublishService publishService) {
+        this.netconfMount = Preconditions.checkNotNull(netconfMount);
+        this.mountPoint = Preconditions.checkNotNull(mountPoint);
+        this.node = Preconditions.checkNotNull(node);
+        this.urnPrefixToStreamMap = Preconditions.checkNotNull(streamMap);
+        this.domPublish = Preconditions.checkNotNull(publishService);
         this.nodeId = node.getNodeId().getValue();
-        this.urnPrefixToStreamMap = streamMap;
-        this.domPublish = publishService;
-        this.initializeStreamNotifRegistrationMap();
-        LOG.info("NetconfEventSource [{}] created.", nodeId);
+        this.initializeNotificationTopicRegistrationList();
+
+        LOG.info("NetconfEventSource [{}] created.", this.nodeId);
     }
 
-    private void initializeStreamNotifRegistrationMap(){
-        for(String streamName : this.urnPrefixToStreamMap.values()){
-            streamNotifRegistrationMap.put(streamName, new StreamNotificationTopicRegistration(streamName, this.nodeId, this.netconfMount, this));
+    private void initializeNotificationTopicRegistrationList() {
+        notificationTopicRegistrationList.add(new ConnectionNotificationTopicRegistration(ConnectionNotificationSourceName, this));
+        Optional<Map<String, Stream>> streamMap = getAvailableStreams();
+        if(streamMap.isPresent()){
+            for (String urnPrefix : this.urnPrefixToStreamMap.keySet()) {
+                final String streamName = this.urnPrefixToStreamMap.get(urnPrefix);
+                if(streamMap.get().containsKey(streamName)){
+                    notificationTopicRegistrationList.add(new StreamNotificationTopicRegistration(streamMap.get().get(streamName),urnPrefix, this));
+                }
+            }
         }
     }
 
+    private Optional<Map<String, Stream>> getAvailableStreams(){
+
+        Map<String,Stream> streamMap = null;
+        InstanceIdentifier<Streams> pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build();
+        Optional<DataBroker> dataBroker = this.mountPoint.getService(DataBroker.class);
+
+        if(dataBroker.isPresent()){
+
+            ReadOnlyTransaction tx = dataBroker.get().newReadOnlyTransaction();
+            CheckedFuture<Optional<Streams>, ReadFailedException> checkFeature = tx.read(LogicalDatastoreType.OPERATIONAL,pathStream);
+
+            try {
+                Optional<Streams> streams = checkFeature.checkedGet();
+                if(streams.isPresent()){
+                    streamMap = new HashMap<>();
+                    for(Stream stream : streams.get().getStream()){
+                        streamMap.put(stream.getName().getValue(), stream);
+                    }
+                }
+            } catch (ReadFailedException e) {
+                LOG.warn("Can not read streams for node {}",this.nodeId);
+            }
+
+        }
+
+        return Optional.fromNullable(streamMap);
+    }
+
     @Override
     public Future<RpcResult<JoinTopicOutput>> joinTopic(final JoinTopicInput input) {
 
@@ -120,21 +161,18 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener,
 
         JoinTopicStatus joinTopicStatus = JoinTopicStatus.Down;
         if(notificationsToSubscribe != null && notificationsToSubscribe.isEmpty() == false){
-            final Optional<DOMNotificationService> notifyService = netconfMount.getService(DOMNotificationService.class);
+            final Optional<DOMNotificationService> notifyService = getDOMMountPoint().getService(DOMNotificationService.class);
             if(notifyService.isPresent()){
                 int subscribedStreams = 0;
                 for(SchemaPath schemaNotification : notificationsToSubscribe){
-                    final Optional<String> streamName = resolveStream(schemaNotification.getLastComponent());
-                    if(streamName.isPresent()){
-                        LOG.info("Stream {} is activating, TopicId {}", streamName.get(), topicId.getValue() );
-                        StreamNotificationTopicRegistration streamReg = streamNotifRegistrationMap.get(streamName.get());
-                        streamReg.activateStream();
-                        for(SchemaPath notificationPath : notificationsToSubscribe){
-                            LOG.info("Notification listener is registering, Notification {}, TopicId {}", notificationPath, topicId.getValue() );
-                            streamReg.registerNotificationListenerTopic(notificationPath, topicId);
-                        }
-                        subscribedStreams = subscribedStreams + 1;
-                    }
+                   for(NotificationTopicRegistration reg : notificationTopicRegistrationList){
+                      LOG.info("Source of notification {} is activating, TopicId {}", reg.getSourceName(), topicId.getValue() );
+                      reg.activateNotificationSource();
+                      boolean regSuccess = reg.registerNotificationTopic(schemaNotification, topicId);
+                      if(regSuccess){
+                         subscribedStreams = subscribedStreams +1;
+                      }
+                   }
                 }
                 if(subscribedStreams > 0){
                     joinTopicStatus = JoinTopicStatus.Up;
@@ -147,34 +185,42 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener,
 
     }
 
-    private void resubscribeToActiveStreams() {
-        for (StreamNotificationTopicRegistration streamReg : streamNotifRegistrationMap.values()){
-            streamReg.reActivateStream();
+    public void reActivateStreams(){
+        for (NotificationTopicRegistration reg : notificationTopicRegistrationList) {
+           LOG.info("Source of notification {} is reactivating on node {}", reg.getSourceName(), this.nodeId);
+            reg.reActivateNotificationSource();
         }
     }
 
-    private Optional<String> resolveStream(final QName qName) {
-        String streamName = null;
-
-        for (final Map.Entry<String, String> entry : urnPrefixToStreamMap.entrySet()) {
-            final String nameSpace = qName.getNamespace().toString();
-            final String urnPrefix = entry.getKey();
-            if( nameSpace.startsWith(urnPrefix) ) {
-                streamName = entry.getValue();
-                break;
-            }
+    public void deActivateStreams(){
+        for (NotificationTopicRegistration reg : notificationTopicRegistrationList) {
+           LOG.info("Source of notification {} is deactivating on node {}", reg.getSourceName(), this.nodeId);
+            reg.deActivateNotificationSource();
         }
-        return Optional.fromNullable(streamName);
     }
 
     @Override
     public void onNotification(final DOMNotification notification) {
+        LOG.info("Notification {} has been arrived...",notification.getType());
         SchemaPath notificationPath = notification.getType();
-        LOG.info("Notification {} has come.",notification.getType());
-        for(StreamNotificationTopicRegistration streamReg : streamNotifRegistrationMap.values()){
-            for(TopicId topicId : streamReg.getNotificationTopicIds(notificationPath)){
-                publishNotification(notification, topicId);
-                LOG.info("Notification {} has been published for TopicId {}",notification.getType(), topicId.getValue());
+        Date notificationEventTime = null;
+        if(notification instanceof DOMEvent){
+            notificationEventTime = ((DOMEvent) notification).getEventTime();
+        }
+        for(NotificationTopicRegistration notifReg : notificationTopicRegistrationList){
+            ArrayList<TopicId> topicIdsForNotification = notifReg.getNotificationTopicIds(notificationPath);
+            if(topicIdsForNotification != null && topicIdsForNotification.isEmpty() == false){
+
+                if(notifReg instanceof StreamNotificationTopicRegistration){
+                    StreamNotificationTopicRegistration streamReg = (StreamNotificationTopicRegistration)notifReg;
+                    streamReg.setLastEventTime(notificationEventTime);
+                }
+
+                for(TopicId topicId : topicIdsForNotification){
+                    publishNotification(notification, topicId);
+                    LOG.info("Notification {} has been published for TopicId {}",notification.getType(), topicId.getValue());
+                }
+
             }
         }
     }
@@ -183,7 +229,7 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener,
          final ContainerNode topicNotification = Builders.containerBuilder()
                  .withNodeIdentifier(TOPIC_NOTIFICATION_ARG)
                  .withChild(ImmutableNodes.leafNode(TOPIC_ID_ARG, topicId))
-                 .withChild(ImmutableNodes.leafNode(EVENT_SOURCE_ARG, nodeId))
+                 .withChild(ImmutableNodes.leafNode(EVENT_SOURCE_ARG, this.nodeId))
                  .withChild(encapsulate(notification))
                  .build();
          try {
@@ -201,7 +247,7 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener,
 
         final DOMResult result = new DOMResult(element);
 
-        final SchemaContext context = netconfMount.getSchemaContext();
+        final SchemaContext context = getDOMMountPoint().getSchemaContext();
         final SchemaPath schemaPath = body.getType();
         try {
             NetconfMessageTransformUtil.writeNormalizedNode(body.getBody(), result, schemaPath, context);
@@ -214,34 +260,6 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener,
         }
     }
 
-    @Override
-    public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
-        boolean wasConnected = false;
-        boolean nowConnected = false;
-
-        for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : change.getOriginalData().entrySet()) {
-            if ( isNetconfNode(changeEntry) ) {
-                final NetconfNode nn = (NetconfNode)changeEntry.getValue();
-                wasConnected = nn.isConnected();
-            }
-        }
-
-        for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : change.getUpdatedData().entrySet()) {
-            if ( isNetconfNode(changeEntry) ) {
-                final NetconfNode nn = (NetconfNode)changeEntry.getValue();
-                nowConnected = nn.isConnected();
-            }
-        }
-
-        if (wasConnected == false && nowConnected == true) {
-            resubscribeToActiveStreams();
-        }
-    }
-
-    private static boolean isNetconfNode(final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry )  {
-        return NetconfNode.class.equals(changeEntry.getKey().getTargetType());
-    }
-
     private List<SchemaPath> getMatchingNotifications(NotificationPattern notificationPattern){
         // FIXME: default language should already be regex
         final String regex = Util.wildcardToRegex(notificationPattern.getValue());
@@ -256,105 +274,46 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener,
 
     @Override
     public void close() throws Exception {
-        for(StreamNotificationTopicRegistration streamReg : streamNotifRegistrationMap.values()){
-            streamReg.deactivateStream();
+        for(NotificationTopicRegistration streamReg : notificationTopicRegistrationList){
+            streamReg.close();
         }
     }
 
     @Override
     public NodeKey getSourceNodeKey(){
-        return node.getKey();
+        return getNode().getKey();
     }
 
     @Override
     public List<SchemaPath> getAvailableNotifications() {
+
+        final List<SchemaPath> availNotifList = new ArrayList<>();
+        // add Event Source Connection status notification
+        availNotifList.add(ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH);
+
         // FIXME: use SchemaContextListener to get changes asynchronously
-        final Set<NotificationDefinition> availableNotifications = netconfMount.getSchemaContext().getNotifications();
-        final List<SchemaPath> qNs = new ArrayList<>(availableNotifications.size());
+        final Set<NotificationDefinition> availableNotifications = getDOMMountPoint().getSchemaContext().getNotifications();
+        // add all known notifications from netconf device
         for (final NotificationDefinition nd : availableNotifications) {
-            qNs.add(nd.getPath());
+            availNotifList.add(nd.getPath());
         }
-        return qNs;
+        return availNotifList;
     }
 
-    private class StreamNotificationTopicRegistration{
-
-        final private String streamName;
-        final private DOMMountPoint netconfMount;
-        final private String nodeId;
-        final private NetconfEventSource notificationListener;
-        private boolean active;
-
-        private ConcurrentHashMap<SchemaPath, ListenerRegistration<NetconfEventSource>> notificationRegistrationMap = new ConcurrentHashMap<>();
-        private ConcurrentHashMap<SchemaPath, ArrayList<TopicId>> notificationTopicMap = new ConcurrentHashMap<>();
-
-        public StreamNotificationTopicRegistration(final String streamName, final String nodeId, final DOMMountPoint netconfMount, NetconfEventSource notificationListener) {
-            this.streamName = streamName;
-            this.netconfMount = netconfMount;
-            this.nodeId = nodeId;
-            this.notificationListener = notificationListener;
-            this.active = false;
-        }
-
-        public boolean isActive() {
-            return active;
-        }
-
-        public void reActivateStream(){
-            if(this.isActive()){
-                LOG.info("Stream {} is reactivated active on node {}.", this.streamName, this.nodeId);
-                final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME))
-                        .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.streamName))
-                        .build();
-                netconfMount.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input);
-            }
-        }
-
-        public void activateStream() {
-            if(this.isActive() == false){
-                LOG.info("Stream {} is not active on node {}. Will subscribe.", this.streamName, this.nodeId);
-                final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME))
-                        .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.streamName))
-                        .build();
-                netconfMount.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input);
-                this.active = true;
-            } else {
-                LOG.info("Stream {} is now active on node {}", this.streamName, this.nodeId);
-            }
-        }
-
-        public void deactivateStream() {
-            for(ListenerRegistration<NetconfEventSource> reg : notificationRegistrationMap.values()){
-                reg.close();
-            }
-            this.active = false;
-        }
-
-        public String getStreamName() {
-            return streamName;
-        }
+    public Node getNode() {
+        return node;
+    }
 
-        public ArrayList<TopicId> getNotificationTopicIds(SchemaPath notificationPath){
-            return notificationTopicMap.get(notificationPath);
-        }
+    DOMMountPoint getDOMMountPoint() {
+        return netconfMount;
+    }
 
-        public void registerNotificationListenerTopic(SchemaPath notificationPath, TopicId topicId){
-            final Optional<DOMNotificationService> notifyService = netconfMount.getService(DOMNotificationService.class);
-            if(notificationPath != null && notifyService.isPresent()){
-                ListenerRegistration<NetconfEventSource> registration = notifyService.get().registerNotificationListener(this.notificationListener,notificationPath);
-                notificationRegistrationMap.put(notificationPath, registration);
-                ArrayList<TopicId> topicIds = getNotificationTopicIds(notificationPath);
-                if(topicIds == null){
-                    topicIds = new ArrayList<>();
-                    topicIds.add(topicId);
-                } else {
-                    if(topicIds.contains(topicId) == false){
-                        topicIds.add(topicId);
-                    }
-                }
-                notificationTopicMap.put(notificationPath, topicIds);
-            }
-        }
+    MountPoint getMountPoint() {
+        return mountPoint;
+    }
 
+    NetconfNode getNetconfNode(){
+        return node.getAugmentation(NetconfNode.class);
     }
+
 }
index dd64e7707325527e6d4f8b1e0e99dc993a8f0c7f..180d3d421474636d879c8bd605b1e702d3191abf 100644 (file)
@@ -8,7 +8,6 @@
 
 package org.opendaylight.controller.messagebus.eventsources.netconf;
 
-
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -21,16 +20,12 @@ import org.opendaylight.controller.md.sal.binding.api.MountPointService;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
-import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
 import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
@@ -38,12 +33,9 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 
 public final class NetconfEventSourceManager implements DataChangeListener, AutoCloseable {
@@ -54,19 +46,11 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto
                 .child(Topology.class, NETCONF_TOPOLOGY_KEY)
                 .child(Node.class);
 
-    private static final YangInstanceIdentifier NETCONF_DEVICE_DOM_PATH = YangInstanceIdentifier.builder()
-            .node(NetworkTopology.QNAME)
-            .node(Topology.QNAME)
-            .nodeWithKey(Topology.QNAME, QName.create(Topology.QNAME, "topology-id"),TopologyNetconf.QNAME.getLocalName())
-            .node(Node.QNAME)
-            .build();
-    private static final QName NODE_ID_QNAME = QName.create(Node.QNAME,"node-id");
-
     private final Map<String, String> streamMap;
-    private final ConcurrentHashMap<InstanceIdentifier<?>, EventSourceRegistration<NetconfEventSource>> eventSourceRegistration = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<InstanceIdentifier<?>, NetconfEventSourceRegistration> registrationMap = new ConcurrentHashMap<>();
     private final DOMNotificationPublishService publishService;
     private final DOMMountPointService domMounts;
-    private final MountPointService bindingMounts;
+    private final MountPointService mountPointService;
     private ListenerRegistration<DataChangeListener> listenerRegistration;
     private final EventSourceRegistry eventSourceRegistry;
 
@@ -78,7 +62,7 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto
             final List<NamespaceToStream> namespaceMapping){
 
         final NetconfEventSourceManager eventSourceManager =
-                new NetconfEventSourceManager(domPublish, domMount, bindingMount, eventSourceRegistry, namespaceMapping);
+                new NetconfEventSourceManager(domPublish, domMount,bindingMount, eventSourceRegistry, namespaceMapping);
 
         eventSourceManager.initialize(dataBroker);
 
@@ -99,7 +83,7 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto
         Preconditions.checkNotNull(namespaceMapping);
         this.streamMap = namespaceToStreamMapping(namespaceMapping);
         this.domMounts = domMount;
-        this.bindingMounts = bindingMount;
+        this.mountPointService = bindingMount;
         this.publishService = domPublish;
         this.eventSourceRegistry = eventSourceRegistry;
     }
@@ -123,10 +107,10 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto
     @Override
     public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> event) {
 
-        LOG.debug("[DataChangeEvent<InstanceIdentifier<?>, DataObject>: {}]", event);
+        LOG.info("[DataChangeEvent<InstanceIdentifier<?>, DataObject>: {}]", event);
         for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getCreatedData().entrySet()) {
             if (changeEntry.getValue() instanceof Node) {
-                nodeUpdated(changeEntry.getKey(),(Node) changeEntry.getValue());
+                nodeCreated(changeEntry.getKey(),(Node) changeEntry.getValue());
             }
         }
 
@@ -136,82 +120,94 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto
             }
         }
 
-    }
+        for(InstanceIdentifier<?> removePath : event.getRemovedPaths()){
+            DataObject removeObject = event.getOriginalData().get(removePath);
+            if(removeObject instanceof Node){
+                nodeRemoved(removePath);
+            }
+        }
 
-    private void nodeUpdated(final InstanceIdentifier<?> key, final Node node) {
+    }
 
-        // we listen on node tree, therefore we should rather throw IllegalStateException when node is null
-        if ( node == null ) {
-            throw new IllegalStateException("Node is null");
-        }
-        if ( isNetconfNode(node) == false ) {
-            LOG.debug("OnDataChanged Event. Not a Netconf node.");
+    private void nodeCreated(final InstanceIdentifier<?> key, final Node node){
+        Preconditions.checkNotNull(key);
+        if(validateNode(node) == false){
+            LOG.warn("NodeCreated event : Node [{}] is null or not valid.", key.toString());
             return;
         }
-        if ( isEventSource(node) == false ) {
-            LOG.debug("OnDataChanged Event. Node an EventSource node.");
-            return;
+        LOG.info("Netconf event source [{}] is creating...", key.toString());
+        NetconfEventSourceRegistration nesr = NetconfEventSourceRegistration.create(key, node, this);
+        if(nesr != null){
+            NetconfEventSourceRegistration nesrOld = registrationMap.put(key, nesr);
+            if(nesrOld != null){
+                nesrOld.close();
+            }
         }
-        if(node.getAugmentation(NetconfNode.class).getConnectionStatus() != ConnectionStatus.Connected ) {
+    }
+
+    private void nodeUpdated(final InstanceIdentifier<?> key, final Node node){
+        Preconditions.checkNotNull(key);
+        if(validateNode(node) == false){
+            LOG.warn("NodeUpdated event : Node [{}] is null or not valid.", key.toString());
             return;
         }
 
-        if(!eventSourceRegistration.containsKey(key)) {
-            createEventSource(key,node);
+        LOG.info("Netconf event source [{}] is updating...", key.toString());
+        NetconfEventSourceRegistration nesr = registrationMap.get(key);
+        if(nesr != null){
+            nesr.updateStatus();
+        } else {
+            nodeCreated(key, node);
         }
     }
 
-    private void createEventSource(final InstanceIdentifier<?> key, final Node node) {
-        final Optional<DOMMountPoint> netconfMount = domMounts.getMountPoint(domMountPath(node.getNodeId()));
-
-        if(netconfMount.isPresent()) {
-            final NetconfEventSource netconfEventSource =
-                    new NetconfEventSource(node, streamMap, netconfMount.get(), publishService);
-            final EventSourceRegistration<NetconfEventSource> registration = eventSourceRegistry.registerEventSource(netconfEventSource);
-            LOG.info("Event source {} has been registered",node.getNodeId().getValue());
-            eventSourceRegistration.putIfAbsent(key, registration);
-
+    private void nodeRemoved(final InstanceIdentifier<?> key){
+        Preconditions.checkNotNull(key);
+        LOG.info("Netconf event source [{}] is removing...", key.toString());
+        NetconfEventSourceRegistration nesr = registrationMap.remove(key);
+        if(nesr != null){
+            nesr.close();
         }
     }
 
-    private YangInstanceIdentifier domMountPath(final NodeId nodeId) {
-        return YangInstanceIdentifier.builder(NETCONF_DEVICE_DOM_PATH).nodeWithKey(Node.QNAME, NODE_ID_QNAME, nodeId.getValue()).build();
+    private boolean validateNode(final Node node){
+        if(node == null){
+            return false;
+        }
+        return isNetconfNode(node);
     }
 
-    private boolean isNetconfNode(final Node node)  {
-        return node.getAugmentation(NetconfNode.class) != null ;
+    Map<String, String> getStreamMap() {
+        return streamMap;
     }
 
-    private boolean isEventSource(final Node node) {
+    DOMNotificationPublishService getPublishService() {
+        return publishService;
+    }
 
-        final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
-        return isEventSource(netconfNode);
+    DOMMountPointService getDomMounts() {
+        return domMounts;
+    }
 
+    EventSourceRegistry getEventSourceRegistry() {
+        return eventSourceRegistry;
     }
 
-    private boolean isEventSource(final NetconfNode node) {
-        if (node.getAvailableCapabilities() == null) {
-            return false;
-        }
-        final List<String> capabilities = node.getAvailableCapabilities().getAvailableCapability();
-        if(capabilities == null) {
-             return false;
-        }
-        for (final String capability : node.getAvailableCapabilities().getAvailableCapability()) {
-            if(capability.startsWith("(urn:ietf:params:xml:ns:netconf:notification")) {
-                return true;
-            }
-        }
+    MountPointService getMountPointService() {
+        return mountPointService;
+    }
 
-        return false;
+    private boolean isNetconfNode(final Node node)  {
+        return node.getAugmentation(NetconfNode.class) != null ;
     }
 
     @Override
     public void close() {
-        for(final EventSourceRegistration<NetconfEventSource> reg : eventSourceRegistration.values()){
+        listenerRegistration.close();
+        for(final NetconfEventSourceRegistration reg : registrationMap.values()){
             reg.close();
         }
-        listenerRegistration.close();
+        registrationMap.clear();
     }
 
 }
\ No newline at end of file
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceRegistration.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceRegistration.java
new file mode 100644 (file)
index 0000000..891887e
--- /dev/null
@@ -0,0 +1,196 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.eventsources.netconf;
+
+import java.util.List;
+
+import org.opendaylight.controller.md.sal.binding.api.MountPoint;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+/**
+ * Helper class to keep connection status of netconf node  and event source registration object
+ *
+ */
+public class NetconfEventSourceRegistration implements AutoCloseable{
+
+    private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSourceRegistration.class);
+    private static final YangInstanceIdentifier NETCONF_DEVICE_DOM_PATH = YangInstanceIdentifier.builder()
+            .node(NetworkTopology.QNAME)
+            .node(Topology.QNAME)
+            .nodeWithKey(Topology.QNAME, QName.create(Topology.QNAME, "topology-id"),TopologyNetconf.QNAME.getLocalName())
+            .node(Node.QNAME)
+            .build();
+    private static final QName NODE_ID_QNAME = QName.create(Node.QNAME,"node-id");
+    private static final String NotificationCapabilityPrefix = "(urn:ietf:params:xml:ns:netconf:notification";
+
+    private final Node node;
+    private final InstanceIdentifier<?> instanceIdent;
+    private final NetconfEventSourceManager netconfEventSourceManager;
+    private ConnectionStatus currentNetconfConnStatus;
+    private EventSourceRegistration<NetconfEventSource> eventSourceRegistration;
+
+    public static NetconfEventSourceRegistration create(final InstanceIdentifier<?> instanceIdent, final Node node,
+                final NetconfEventSourceManager netconfEventSourceManager){
+        Preconditions.checkNotNull(instanceIdent);
+        Preconditions.checkNotNull(node);
+        Preconditions.checkNotNull(netconfEventSourceManager);
+        if(isEventSource(node) == false){
+            return null;
+        }
+        NetconfEventSourceRegistration nesr = new NetconfEventSourceRegistration(instanceIdent, node, netconfEventSourceManager);
+        nesr.updateStatus();
+        LOG.info("NetconfEventSourceRegistration for node {} has been initialized...",node.getNodeId().getValue());
+        return nesr;
+    }
+
+    private static boolean isEventSource(final Node node) {
+        final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
+        if(netconfNode == null){
+            return false;
+        }
+        if (netconfNode.getAvailableCapabilities() == null) {
+            return false;
+        }
+        final List<String> capabilities = netconfNode.getAvailableCapabilities().getAvailableCapability();
+        if(capabilities == null || capabilities.isEmpty()) {
+             return false;
+        }
+        for (final String capability : netconfNode.getAvailableCapabilities().getAvailableCapability()) {
+            if(capability.startsWith(NotificationCapabilityPrefix)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private NetconfEventSourceRegistration(final InstanceIdentifier<?> instanceIdent, final Node node, final NetconfEventSourceManager netconfEventSourceManager) {
+        this.instanceIdent = instanceIdent;
+        this.node = node;
+        this.netconfEventSourceManager = netconfEventSourceManager;
+        this.eventSourceRegistration =null;
+    }
+
+    public Node getNode() {
+        return node;
+    }
+
+    Optional<EventSourceRegistration<NetconfEventSource>> getEventSourceRegistration() {
+        return Optional.fromNullable(eventSourceRegistration);
+    }
+
+    NetconfNode getNetconfNode(){
+        return node.getAugmentation(NetconfNode.class);
+    }
+
+    void updateStatus(){
+        ConnectionStatus netconfConnStatus = getNetconfNode().getConnectionStatus();
+        LOG.info("Change status on node {}, new status is {}",this.node.getNodeId().getValue(),netconfConnStatus);
+        if(netconfConnStatus.equals(currentNetconfConnStatus)){
+            return;
+        }
+        changeStatus(netconfConnStatus);
+    }
+
+    private boolean checkConnectionStatusType(ConnectionStatus status){
+        if(    status == ConnectionStatus.Connected
+            || status == ConnectionStatus.Connecting
+            || status == ConnectionStatus.UnableToConnect){
+            return true;
+        }
+        return false;
+    }
+
+    private void changeStatus(ConnectionStatus newStatus){
+        Preconditions.checkNotNull(newStatus);
+        if(checkConnectionStatusType(newStatus) == false){
+            throw new IllegalStateException("Unknown new Netconf Connection Status");
+        }
+        if(this.currentNetconfConnStatus == null){
+            if (newStatus == ConnectionStatus.Connected){
+                registrationEventSource();
+            }
+        } else if (this.currentNetconfConnStatus == ConnectionStatus.Connecting){
+            if (newStatus == ConnectionStatus.Connected){
+                if(this.eventSourceRegistration == null){
+                    registrationEventSource();
+                } else {
+                    // reactivate stream on registered event source (invoke publish notification about connection)
+                    this.eventSourceRegistration.getInstance().reActivateStreams();
+                }
+            }
+        } else if (this.currentNetconfConnStatus == ConnectionStatus.Connected) {
+
+            if(newStatus == ConnectionStatus.Connecting || newStatus == ConnectionStatus.UnableToConnect){
+                // deactivate streams on registered event source (invoke publish notification about connection)
+                this.eventSourceRegistration.getInstance().deActivateStreams();
+            }
+        } else if (this.currentNetconfConnStatus == ConnectionStatus.UnableToConnect){
+            if(newStatus == ConnectionStatus.Connected){
+                if(this.eventSourceRegistration == null){
+                    registrationEventSource();
+                } else {
+                    // reactivate stream on registered event source (invoke publish notification about connection)
+                    this.eventSourceRegistration.getInstance().reActivateStreams();
+                }
+            }
+        } else {
+            throw new IllegalStateException("Unknown current Netconf Connection Status");
+        }
+        this.currentNetconfConnStatus = newStatus;
+    }
+
+    private void registrationEventSource(){
+        final Optional<MountPoint> mountPoint = netconfEventSourceManager.getMountPointService().getMountPoint(instanceIdent);
+        final Optional<DOMMountPoint> domMountPoint = netconfEventSourceManager.getDomMounts().getMountPoint(domMountPath(node.getNodeId()));
+        EventSourceRegistration<NetconfEventSource> registration = null;
+        if(domMountPoint.isPresent() && mountPoint.isPresent()) {
+            final NetconfEventSource netconfEventSource = new NetconfEventSource(
+                    node,
+                    netconfEventSourceManager.getStreamMap(),
+                    domMountPoint.get(),
+                    mountPoint.get(),
+                    netconfEventSourceManager.getPublishService());
+            registration = netconfEventSourceManager.getEventSourceRegistry().registerEventSource(netconfEventSource);
+            LOG.info("Event source {} has been registered",node.getNodeId().getValue());
+        }
+        this.eventSourceRegistration = registration;
+      }
+
+    private YangInstanceIdentifier domMountPath(final NodeId nodeId) {
+        return YangInstanceIdentifier.builder(NETCONF_DEVICE_DOM_PATH).nodeWithKey(Node.QNAME, NODE_ID_QNAME, nodeId.getValue()).build();
+    }
+
+    private void closeEventSourceRegistration(){
+        if(getEventSourceRegistration().isPresent()){
+            getEventSourceRegistration().get().close();
+        }
+    }
+
+    @Override
+    public void close() {
+        closeEventSourceRegistration();
+    }
+
+}
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NotificationTopicRegistration.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NotificationTopicRegistration.java
new file mode 100644 (file)
index 0000000..7812bd2
--- /dev/null
@@ -0,0 +1,77 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.eventsources.netconf;
+
+import java.util.ArrayList;
+
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+
+public abstract class NotificationTopicRegistration implements AutoCloseable {
+
+    public enum NotificationSourceType{
+        NetconfDeviceStream,
+        ConnectionStatusChange;
+    }
+
+    private boolean active;
+    private final NotificationSourceType notificationSourceType;
+    private final String sourceName;
+    private final String notificationUrnPrefix;
+    private boolean replaySupported;
+
+    protected NotificationTopicRegistration(NotificationSourceType notificationSourceType, String sourceName, String notificationUrnPrefix) {
+        this.notificationSourceType = notificationSourceType;
+        this.sourceName = sourceName;
+        this.notificationUrnPrefix = notificationUrnPrefix;
+        this.active = false;
+        this.setReplaySupported(false);
+    }
+
+    public boolean isActive() {
+        return active;
+    }
+
+    protected void setActive(boolean active) {
+        this.active = active;
+    }
+
+    public NotificationSourceType getNotificationSourceType() {
+        return notificationSourceType;
+    }
+
+    public String getSourceName() {
+        return sourceName;
+    }
+
+    public String getNotificationUrnPrefix() {
+        return notificationUrnPrefix;
+    }
+
+    abstract void activateNotificationSource();
+
+    abstract void deActivateNotificationSource();
+
+    abstract void reActivateNotificationSource();
+
+    abstract boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId);
+
+    abstract void unRegisterNotificationTopic(TopicId topicId);
+
+    abstract ArrayList<TopicId> getNotificationTopicIds(SchemaPath notificationPath);
+
+    public boolean isReplaySupported() {
+        return replaySupported;
+    }
+
+    protected void setReplaySupported(boolean replaySupported) {
+        this.replaySupported = replaySupported;
+    }
+
+}
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java
new file mode 100644 (file)
index 0000000..e0d4fe2
--- /dev/null
@@ -0,0 +1,183 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.eventsources.netconf;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeAttrBuilder;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+
+public class StreamNotificationTopicRegistration extends NotificationTopicRegistration {
+
+    private static final Logger LOG = LoggerFactory.getLogger(StreamNotificationTopicRegistration.class);
+    private static final NodeIdentifier STREAM_QNAME = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"stream"));
+    private static final SchemaPath CREATE_SUBSCRIPTION = SchemaPath.create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription"));
+    private static final NodeIdentifier START_TIME_SUBSCRIPTION = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"startTime"));
+
+    final private DOMMountPoint domMountPoint;
+    final private String nodeId;
+    final private NetconfEventSource netconfEventSource;
+    final private Stream stream;
+    private Date lastEventTime;
+
+    private ConcurrentHashMap<SchemaPath, ListenerRegistration<NetconfEventSource>> notificationRegistrationMap = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<SchemaPath, ArrayList<TopicId>> notificationTopicMap = new ConcurrentHashMap<>();
+
+    public StreamNotificationTopicRegistration(final Stream stream, final String notificationPrefix, NetconfEventSource netconfEventSource) {
+        super(NotificationSourceType.NetconfDeviceStream, stream.getName().getValue(), notificationPrefix);
+        this.domMountPoint = netconfEventSource.getDOMMountPoint();
+        this.nodeId = netconfEventSource.getNode().getNodeId().getValue().toString();
+        this.netconfEventSource = netconfEventSource;
+        this.stream = stream;
+        this.lastEventTime= null;
+        setReplaySupported(this.stream.isReplaySupport());
+        setActive(false);
+    }
+
+    void activateNotificationSource() {
+        if(isActive() == false){
+            LOG.info("Stream {} is not active on node {}. Will subscribe.", this.getStreamName(), this.nodeId);
+            final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME))
+                    .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName()))
+                    .build();
+            CheckedFuture<DOMRpcResult, DOMRpcException> csFuture = domMountPoint.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input);
+            try {
+                csFuture.checkedGet();
+                setActive(true);
+            } catch (DOMRpcException e) {
+                LOG.warn("Can not subscribe stream {} on node {}", this.getSourceName(), this.nodeId);
+                setActive(false);
+                return;
+            }
+        } else {
+            LOG.info("Stream {} is now active on node {}", this.getStreamName(), this.nodeId);
+        }
+    }
+
+    void reActivateNotificationSource(){
+        if(isActive()){
+            LOG.info("Stream {} is reactivating on node {}.", this.getStreamName(), this.nodeId);
+            DataContainerNodeAttrBuilder<NodeIdentifier, ContainerNode> inputBuilder =
+                    Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME))
+                    .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName()));
+            if(isReplaySupported() && this.getLastEventTime() != null){
+                inputBuilder.withChild(ImmutableNodes.leafNode(START_TIME_SUBSCRIPTION, this.getLastEventTime()));
+            }
+            final ContainerNode input = inputBuilder.build();
+            CheckedFuture<DOMRpcResult, DOMRpcException> csFuture = domMountPoint.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input);
+            try {
+                csFuture.checkedGet();
+                setActive(true);
+            } catch (DOMRpcException e) {
+                LOG.warn("Can not resubscribe stream {} on node {}", this.getSourceName(), this.nodeId);
+                setActive(false);
+                return;
+            }
+        }
+    }
+
+    @Override
+    void deActivateNotificationSource() {
+        // no operations need
+    }
+
+    private void closeStream() {
+        if(isActive()){
+            for(ListenerRegistration<NetconfEventSource> reg : notificationRegistrationMap.values()){
+                reg.close();
+            }
+            notificationRegistrationMap.clear();
+            notificationTopicMap.clear();
+            setActive(false);
+        }
+    }
+
+    private String getStreamName() {
+        return getSourceName();
+    }
+
+    @Override
+    ArrayList<TopicId> getNotificationTopicIds(SchemaPath notificationPath){
+        return notificationTopicMap.get(notificationPath);
+    }
+
+    @Override
+    boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId){
+        if(validateNotificationPath(notificationPath) == false){
+            LOG.debug("Bad SchemaPath for notification try to register");
+            return false;
+        }
+        final Optional<DOMNotificationService> notifyService = domMountPoint.getService(DOMNotificationService.class);
+        if(notifyService.isPresent() == false){
+            LOG.debug("DOMNotificationService is not present");
+            return false;
+        }
+        ListenerRegistration<NetconfEventSource> registration = notifyService.get().registerNotificationListener(this.netconfEventSource,notificationPath);
+        notificationRegistrationMap.put(notificationPath, registration);
+        ArrayList<TopicId> topicIds = getNotificationTopicIds(notificationPath);
+        if(topicIds == null){
+            topicIds = new ArrayList<>();
+            topicIds.add(topicId);
+        } else {
+            if(topicIds.contains(topicId) == false){
+                topicIds.add(topicId);
+            }
+        }
+        notificationTopicMap.put(notificationPath, topicIds);
+        return true;
+    }
+
+    private boolean validateNotificationPath(SchemaPath notificationPath){
+        if(notificationPath == null){
+            return false;
+        }
+        String nameSpace = notificationPath.getLastComponent().toString();
+        return nameSpace.startsWith(getNotificationUrnPrefix());
+    }
+
+    Optional<Date> getLastEventTime() {
+        return Optional.fromNullable(lastEventTime);
+    }
+
+
+    void setLastEventTime(Date lastEventTime) {
+        this.lastEventTime = lastEventTime;
+    }
+
+    @Override
+    public void close() throws Exception {
+        closeStream();
+    }
+
+    @Override
+    void unRegisterNotificationTopic(TopicId topicId) {
+        // TODO: use it when destroy topic will be implemented
+    }
+
+}
@@ -5,7 +5,7 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.messagebus.app.impl;
+package org.opendaylight.controller.messagebus.eventsources.netconf;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
@@ -19,32 +19,30 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
 
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.config.yang.messagebus.app.impl.NamespaceToStream;
-import org.opendaylight.controller.md.sal.binding.api.BindingService;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.MountPoint;
 import org.opendaylight.controller.md.sal.binding.api.MountPointService;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
-import org.opendaylight.controller.messagebus.eventsources.netconf.NetconfEventSourceManager;
+import org.opendaylight.controller.messagebus.app.impl.EventSourceTopology;
 import org.opendaylight.controller.messagebus.spi.EventSource;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
 import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
-import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.AvailableCapabilities;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.DataObject;
@@ -52,10 +50,10 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
 
 public class NetconfEventSourceManagerTest {
 
-    private static final String notification_capability_prefix = "(urn:ietf:params:xml:ns:netconf:notification";
     NetconfEventSourceManager netconfEventSourceManager;
     ListenerRegistration listenerRegistrationMock;
     DOMMountPointService domMountPointServiceMock;
@@ -82,6 +80,37 @@ public class NetconfEventSourceManagerTest {
         listenerRegistrationMock = mock(ListenerRegistration.class);
         doReturn(listenerRegistrationMock).when(dataBrokerMock).registerDataChangeListener(eq(LogicalDatastoreType.OPERATIONAL), any(InstanceIdentifier.class), any(NetconfEventSourceManager.class), eq(AsyncDataBroker.DataChangeScope.SUBTREE));
 
+        Optional<DOMMountPoint> optionalDomMountServiceMock = (Optional<DOMMountPoint>) mock(Optional.class);
+        doReturn(true).when(optionalDomMountServiceMock).isPresent();
+        doReturn(optionalDomMountServiceMock).when(domMountPointServiceMock).getMountPoint((YangInstanceIdentifier)notNull());
+
+        DOMMountPoint domMountPointMock = mock(DOMMountPoint.class);
+        doReturn(domMountPointMock).when(optionalDomMountServiceMock).get();
+
+
+        Optional optionalBindingMountMock = mock(Optional.class);
+        doReturn(true).when(optionalBindingMountMock).isPresent();
+
+        MountPoint mountPointMock = mock(MountPoint.class);
+        doReturn(optionalBindingMountMock).when(mountPointServiceMock).getMountPoint(any(InstanceIdentifier.class));
+        doReturn(mountPointMock).when(optionalBindingMountMock).get();
+
+        Optional optionalMpDataBroker = mock(Optional.class);
+        DataBroker mpDataBroker = mock(DataBroker.class);
+        doReturn(optionalMpDataBroker).when(mountPointMock).getService(DataBroker.class);
+        doReturn(true).when(optionalMpDataBroker).isPresent();
+        doReturn(mpDataBroker).when(optionalMpDataBroker).get();
+
+        ReadOnlyTransaction rtx = mock(ReadOnlyTransaction.class);
+        doReturn(rtx).when(mpDataBroker).newReadOnlyTransaction();
+        CheckedFuture<Optional<Streams>, ReadFailedException> checkFeature = (CheckedFuture<Optional<Streams>, ReadFailedException>)mock(CheckedFuture.class);
+        InstanceIdentifier<Streams> pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build();
+        doReturn(checkFeature).when(rtx).read(LogicalDatastoreType.OPERATIONAL, pathStream);
+        Optional<Streams> avStreams = NetconfTestUtils.getAvailableStream("stream01", true);
+        doReturn(avStreams).when(checkFeature).checkedGet();
+
+        EventSourceRegistration esrMock = mock(EventSourceRegistration.class);
+
         netconfEventSourceManager =
                 NetconfEventSourceManager.create(dataBrokerMock,
                         domNotificationPublishServiceMock,
@@ -92,85 +121,57 @@ public class NetconfEventSourceManagerTest {
     }
 
     @Test
-    public void onDataChangedCreateEventSourceTestByCreateEntry() throws InterruptedException, ExecutionException {
-        onDataChangedTestHelper(true,false,true,notification_capability_prefix);
+    public void onDataChangedCreateEventSourceTestByCreateEntry() throws Exception {
+        onDataChangedTestHelper(true,false,true,NetconfTestUtils.notification_capability_prefix);
         netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
         verify(eventSourceRegistry, times(1)).registerEventSource(any(EventSource.class));
     }
 
     @Test
-    public void onDataChangedCreateEventSourceTestByUpdateEntry() throws InterruptedException, ExecutionException {
-        onDataChangedTestHelper(false,true,true, notification_capability_prefix);
+    public void onDataChangedCreateEventSourceTestByUpdateEntry() throws Exception {
+        onDataChangedTestHelper(false,true,true, NetconfTestUtils.notification_capability_prefix);
         netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
         verify(eventSourceRegistry, times(1)).registerEventSource(any(EventSource.class));
     }
 
     @Test
-    public void onDataChangedCreateEventSourceTestNotNeconf() throws InterruptedException, ExecutionException {
-        onDataChangedTestHelper(false,true,false,notification_capability_prefix);
+    public void onDataChangedCreateEventSourceTestNotNeconf() throws Exception {
+        onDataChangedTestHelper(false,true,false,NetconfTestUtils.notification_capability_prefix);
         netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
         verify(eventSourceRegistry, times(0)).registerEventSource(any(EventSource.class));
     }
 
     @Test
-    public void onDataChangedCreateEventSourceTestNotNotificationCapability() throws InterruptedException, ExecutionException {
-        onDataChangedTestHelper(false,true,true,"bad-prefix");
+    public void onDataChangedCreateEventSourceTestNotNotificationCapability() throws Exception {
+        onDataChangedTestHelper(true,false,true,"bad-prefix");
         netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
         verify(eventSourceRegistry, times(0)).registerEventSource(any(EventSource.class));
     }
 
-    private void onDataChangedTestHelper(boolean create, boolean update, boolean isNetconf, String notificationCapabilityPrefix) throws InterruptedException, ExecutionException{
+    private void onDataChangedTestHelper(boolean create, boolean update, boolean isNetconf, String notificationCapabilityPrefix) throws Exception{
         asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class);
         Map<InstanceIdentifier, DataObject> mapCreate = new HashMap<>();
         Map<InstanceIdentifier, DataObject> mapUpdate = new HashMap<>();
-        InstanceIdentifier instanceIdentifierMock = mock(InstanceIdentifier.class);
-        Node dataObjectMock = mock(Node.class);
-
-        if(create){
-            mapCreate.put(instanceIdentifierMock, dataObjectMock);
-        }
-        if(update){
-            mapUpdate.put(instanceIdentifierMock, dataObjectMock);
-        }
 
+        Node node01;
+        String nodeId = "Node01";
         doReturn(mapCreate).when(asyncDataChangeEventMock).getCreatedData();
         doReturn(mapUpdate).when(asyncDataChangeEventMock).getUpdatedData();
-        NetconfNode netconfNodeMock = mock(NetconfNode.class);
-        AvailableCapabilities availableCapabilitiesMock = mock(AvailableCapabilities.class);
+
         if(isNetconf){
-            doReturn(netconfNodeMock).when(dataObjectMock).getAugmentation(NetconfNode.class);
-            doReturn(availableCapabilitiesMock).when(netconfNodeMock).getAvailableCapabilities();
-            List<String> availableCapabilityList = new ArrayList<>();
-            availableCapabilityList.add(notificationCapabilityPrefix +"_availableCapabilityString1");
-            doReturn(availableCapabilityList).when(availableCapabilitiesMock).getAvailableCapability();
-            doReturn(NetconfNodeFields.ConnectionStatus.Connected).when(netconfNodeMock).getConnectionStatus();
+            node01 = NetconfTestUtils.getNetconfNode(nodeId, "node01.test.local", ConnectionStatus.Connected, notificationCapabilityPrefix);
+
         } else {
-            doReturn(null).when(dataObjectMock).getAugmentation(NetconfNode.class);
+            node01 = NetconfTestUtils.getNode(nodeId);
         }
 
-        Optional optionalMock = mock(Optional.class);
-        Optional optionalBindingMountMock = mock(Optional.class);
-        NodeId nodeId = new NodeId("nodeId1");
-        doReturn(nodeId).when(dataObjectMock).getNodeId();
-        doReturn(optionalMock).when(domMountPointServiceMock).getMountPoint((YangInstanceIdentifier)notNull());
-        doReturn(optionalBindingMountMock).when(mountPointServiceMock).getMountPoint(any(InstanceIdentifier.class));
-        doReturn(true).when(optionalMock).isPresent();
-        doReturn(true).when(optionalBindingMountMock).isPresent();
-
-        DOMMountPoint domMountPointMock = mock(DOMMountPoint.class);
-        MountPoint mountPointMock = mock(MountPoint.class);
-        doReturn(domMountPointMock).when(optionalMock).get();
-        doReturn(mountPointMock).when(optionalBindingMountMock).get();
-
-        RpcConsumerRegistry rpcConsumerRegistryMock = mock(RpcConsumerRegistry.class);
-        Optional<BindingService> onlyOptionalMock = (Optional<BindingService>) mock(Optional.class);
-        NotificationsService notificationsServiceMock = mock(NotificationsService.class);
+        if(create){
+            mapCreate.put(NetconfTestUtils.getInstanceIdentifier(node01), node01);
+        }
+        if(update){
+            mapUpdate.put(NetconfTestUtils.getInstanceIdentifier(node01), node01);
+        }
 
-        doReturn(onlyOptionalMock).when(mountPointMock).getService(RpcConsumerRegistry.class);
-        doReturn(rpcConsumerRegistryMock).when(onlyOptionalMock).get();
-        doReturn(notificationsServiceMock).when(rpcConsumerRegistryMock).getRpcService(NotificationsService.class);
-        EventSourceRegistrationImpl esrMock = mock(EventSourceRegistrationImpl.class);
-        doReturn(esrMock).when(eventSourceRegistry).registerEventSource(any(EventSource.class));
     }
 
 }
\ No newline at end of file
@@ -5,15 +5,13 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.messagebus.app.impl;
+package org.opendaylight.controller.messagebus.eventsources.netconf;
+
 
-//import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 
 import java.net.URI;
 import java.util.HashMap;
@@ -24,25 +22,25 @@ import java.util.Set;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.md.sal.binding.api.BindingService;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.MountPoint;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
 import org.opendaylight.controller.md.sal.dom.api.DOMService;
-import org.opendaylight.controller.messagebus.eventsources.netconf.NetconfEventSource;
 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
@@ -57,6 +55,7 @@ public class NetconfEventSourceTest {
 
     NetconfEventSource netconfEventSource;
     DOMMountPoint domMountPointMock;
+    MountPoint mountPointMock;
     JoinTopicInput joinTopicInputMock;
 
     @Before
@@ -64,34 +63,34 @@ public class NetconfEventSourceTest {
         Map<String, String> streamMap = new HashMap<>();
         streamMap.put("uriStr1", "string2");
         domMountPointMock = mock(DOMMountPoint.class);
+        mountPointMock = mock(MountPoint.class);
         DOMNotificationPublishService domNotificationPublishServiceMock = mock(DOMNotificationPublishService.class);
-
         RpcConsumerRegistry rpcConsumerRegistryMock = mock(RpcConsumerRegistry.class);
         Optional<BindingService> onlyOptionalMock = (Optional<BindingService>) mock(Optional.class);
         NotificationsService notificationsServiceMock = mock(NotificationsService.class);
-
         doReturn(notificationsServiceMock).when(rpcConsumerRegistryMock).getRpcService(NotificationsService.class);
-        org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node node
-                = mock(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node.class);
-        org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId nodeId
-                = new org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId("NodeId1");
-        doReturn(nodeId).when(node).getNodeId();
-        netconfEventSource = new NetconfEventSource(node, streamMap, domMountPointMock, domNotificationPublishServiceMock);
-    }
 
-    @Test
-    public void onDataChangedTest(){
-        InstanceIdentifier brmIdent = InstanceIdentifier.create(Nodes.class)
-                .child(Node.class, new NodeKey(new NodeId("brm"))).augmentation(NetconfNode.class);
-        AsyncDataChangeEvent asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class);
-        NetconfNode dataObjectMock = mock(NetconfNode.class);
-        Map<InstanceIdentifier, DataObject> dataChangeMap = new HashMap<>();
-        dataChangeMap.put(brmIdent, dataObjectMock);
-        doReturn(dataChangeMap).when(asyncDataChangeEventMock).getOriginalData();
-        doReturn(dataChangeMap).when(asyncDataChangeEventMock).getUpdatedData();
-        doReturn(true).when(dataObjectMock).isConnected();
-        netconfEventSource.onDataChanged(asyncDataChangeEventMock);
-        verify(dataObjectMock, times(2)).isConnected();
+        Optional<DataBroker> optionalMpDataBroker = (Optional<DataBroker>) mock(Optional.class);
+        DataBroker mpDataBroker = mock(DataBroker.class);
+        doReturn(optionalMpDataBroker).when(mountPointMock).getService(DataBroker.class);
+        doReturn(true).when(optionalMpDataBroker).isPresent();
+        doReturn(mpDataBroker).when(optionalMpDataBroker).get();
+
+        ReadOnlyTransaction rtx = mock(ReadOnlyTransaction.class);
+        doReturn(rtx).when(mpDataBroker).newReadOnlyTransaction();
+        CheckedFuture<Optional<Streams>, ReadFailedException> checkFeature = (CheckedFuture<Optional<Streams>, ReadFailedException>)mock(CheckedFuture.class);
+        InstanceIdentifier<Streams> pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build();
+        doReturn(checkFeature).when(rtx).read(LogicalDatastoreType.OPERATIONAL, pathStream);
+        Optional<Streams> avStreams = NetconfTestUtils.getAvailableStream("stream01", true);
+        doReturn(avStreams).when(checkFeature).checkedGet();
+
+        netconfEventSource = new NetconfEventSource(
+                NetconfTestUtils.getNetconfNode("NodeId1", "node.test.local", ConnectionStatus.Connected, NetconfTestUtils.notification_capability_prefix),
+                streamMap,
+                domMountPointMock,
+                mountPointMock ,
+                domNotificationPublishServiceMock);
+
     }
 
     @Test
@@ -131,10 +130,12 @@ public class NetconfEventSourceTest {
 
         Optional<DOMService> optionalMock = (Optional<DOMService>) mock(Optional.class);
         doReturn(optionalMock).when(domMountPointMock).getService(DOMRpcService.class);
+        doReturn(true).when(optionalMock).isPresent();
         DOMRpcService domRpcServiceMock = mock(DOMRpcService.class);
         doReturn(domRpcServiceMock).when(optionalMock).get();
         CheckedFuture checkedFutureMock = mock(CheckedFuture.class);
         doReturn(checkedFutureMock).when(domRpcServiceMock).invokeRpc(any(SchemaPath.class), any(ContainerNode.class));
+
     }
 
 }
\ No newline at end of file
diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfTestUtils.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfTestUtils.java
new file mode 100644 (file)
index 0000000..5cc9f60
--- /dev/null
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.eventsources.netconf;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.StreamsBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.DomainName;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.AvailableCapabilities;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.AvailableCapabilitiesBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+import com.google.common.base.Optional;
+
+public final class NetconfTestUtils {
+
+    public static final String notification_capability_prefix = "(urn:ietf:params:xml:ns:netconf:notification";
+
+    private NetconfTestUtils() {
+    }
+
+    public static Node getNetconfNode(String nodeIdent,String hostName,ConnectionStatus cs, String notificationCapabilityPrefix){
+
+        DomainName dn = new DomainName(hostName);
+        Host host = new Host(dn);
+
+        List<String> avCapList = new ArrayList<>();
+        avCapList.add(notificationCapabilityPrefix +"_availableCapabilityString1");
+        AvailableCapabilities avCaps = new AvailableCapabilitiesBuilder().setAvailableCapability(avCapList).build();
+        NetconfNode nn = new NetconfNodeBuilder()
+                .setConnectionStatus(cs)
+                .setHost(host)
+                .setAvailableCapabilities(avCaps)
+                .build();
+
+        NodeId nodeId = new NodeId(nodeIdent);
+        NodeKey nk = new NodeKey(nodeId);
+        NodeBuilder nb = new NodeBuilder();
+        nb.setKey(nk);
+
+        nb.addAugmentation(NetconfNode.class, nn);
+        return nb.build();
+    }
+
+    public static Node getNode(String nodeIdent){
+         NodeId nodeId = new NodeId(nodeIdent);
+         NodeKey nk = new NodeKey(nodeId);
+         NodeBuilder nb = new NodeBuilder();
+         nb.setKey(nk);
+         return nb.build();
+    }
+
+    public static InstanceIdentifier<Node> getInstanceIdentifier(Node node){
+        TopologyKey NETCONF_TOPOLOGY_KEY = new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName()));
+        InstanceIdentifier<Node> nodeII = InstanceIdentifier.create(NetworkTopology.class)
+                    .child(Topology.class, NETCONF_TOPOLOGY_KEY)
+                    .child(Node.class, node.getKey());
+        return nodeII;
+    }
+
+    public static Optional<Streams> getAvailableStream(String Name, boolean replaySupport){
+        Stream stream = new StreamBuilder()
+                .setName(new StreamNameType(Name))
+                .setReplaySupport(replaySupport)
+                .build();
+        List<Stream> streamList = new ArrayList<>();
+        streamList.add(stream);
+        Streams streams = new StreamsBuilder().setStream(streamList).build();
+        return Optional.of(streams);
+    }
+
+}