Decouple message bus from netconf connector 12/24612/10
authorTomas Cere <tcere@cisco.com>
Thu, 13 Aug 2015 09:06:22 +0000 (11:06 +0200)
committerTomas Cere <tcere@cisco.com>
Thu, 13 Aug 2015 09:11:27 +0000 (11:11 +0200)
Change-Id: I6a143e868adc1e5c7a9b114798e7009bb6ef8675
Signed-off-by: Maros Marsalek <mmarsale@cisco.com>
Signed-off-by: Tomas Cere <tcere@cisco.com>
38 files changed:
features/mdsal/src/main/resources/features.xml
opendaylight/md-sal/mdsal-artifacts/pom.xml
opendaylight/md-sal/messagebus-config/src/main/resources/initial/05-message-bus.xml
opendaylight/md-sal/messagebus-impl/pom.xml
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModule.java
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java [deleted file]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManager.java [deleted file]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceRegistration.java [deleted file]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java [deleted file]
opendaylight/md-sal/messagebus-impl/src/main/yang/messagebus-app-impl.yang
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManagerTest.java [deleted file]
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceTest.java [deleted file]
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfTestUtils.java [deleted file]
opendaylight/md-sal/messagebus-util/pom.xml [new file with mode: 0644]
opendaylight/md-sal/messagebus-util/src/main/java/org/opendaylight/controller/messagebus/app/util/Providers.java [moved from opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/Providers.java with 95% similarity]
opendaylight/md-sal/messagebus-util/src/main/java/org/opendaylight/controller/messagebus/app/util/TopicDOMNotification.java [moved from opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/TopicDOMNotification.java with 95% similarity]
opendaylight/md-sal/messagebus-util/src/main/java/org/opendaylight/controller/messagebus/app/util/Util.java [moved from opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Util.java with 97% similarity]
opendaylight/md-sal/messagebus-util/src/test/java/org/opendaylight/controller/messagebus/app/util/TopicDOMNotificationTest.java [moved from opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/TopicDOMNotificationTest.java with 97% similarity]
opendaylight/md-sal/messagebus-util/src/test/java/org/opendaylight/controller/messagebus/app/util/UtilTest.java [moved from opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/UtilTest.java with 98% similarity]
opendaylight/md-sal/pom.xml
opendaylight/netconf/features/netconf-connector/src/main/resources/features.xml
opendaylight/netconf/messagebus-netconf/pom.xml [new file with mode: 0644]
opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/config/yang/messagebus/netconf/MessageBusNetconfModule.java [new file with mode: 0644]
opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/config/yang/messagebus/netconf/MessageBusNetconfModuleFactory.java [new file with mode: 0644]
opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/ConnectionNotificationTopicRegistration.java [moved from opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/ConnectionNotificationTopicRegistration.java with 77% similarity]
opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java [new file with mode: 0644]
opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManager.java [new file with mode: 0644]
opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceRegistration.java [new file with mode: 0644]
opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NotificationTopicRegistration.java [moved from opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NotificationTopicRegistration.java with 91% similarity]
opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java [new file with mode: 0644]
opendaylight/netconf/messagebus-netconf/src/main/resources/initial/06-message-netconf.xml [new file with mode: 0644]
opendaylight/netconf/messagebus-netconf/src/main/yang/messagebus-netconf.yang [new file with mode: 0644]
opendaylight/netconf/messagebus-netconf/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManagerTest.java [new file with mode: 0644]
opendaylight/netconf/messagebus-netconf/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceTest.java [new file with mode: 0644]
opendaylight/netconf/messagebus-netconf/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfTestUtils.java [new file with mode: 0644]
opendaylight/netconf/netconf-artifacts/pom.xml
opendaylight/netconf/pom.xml

index d5723b5..0182051 100644 (file)
         <configfile finalname="${config.configfile.directory}/20-clustering-test-app.xml">mvn:org.opendaylight.controller.samples/clustering-it-config/${project.version}/xml/config</configfile>
     </feature>
 
-    <!-- FIXME decouple messagebus code into messagebus and messagebus netconf -->
-    <feature name='odl-message-bus' version='${project.version}'>
-        <!--<feature version='${project.version}'>odl-netconf-connector</feature>-->
+    <feature name='odl-message-bus-collector' version='${project.version}'>
         <bundle>mvn:org.opendaylight.controller.model/model-inventory/${mdsal.version}</bundle>
         <feature version='${project.version}'>odl-mdsal-broker</feature>
         <bundle>mvn:org.opendaylight.controller/messagebus-api/${project.version}</bundle>
         <bundle>mvn:org.opendaylight.controller/messagebus-spi/${project.version}</bundle>
+        <bundle>mvn:org.opendaylight.controller/messagebus-util/${project.version}</bundle>
         <bundle>mvn:org.opendaylight.controller/messagebus-impl/${project.version}</bundle>
         <configfile finalname="${config.configfile.directory}/05-message-bus.xml">mvn:org.opendaylight.controller/messagebus-config/${project.version}/xml/config</configfile>
     </feature>
index 05cff81..61ba3cc 100644 (file)
                 <artifactId>messagebus-impl</artifactId>
                 <version>${project.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.opendaylight.controller</groupId>
+                <artifactId>messagebus-util</artifactId>
+                <version>${project.version}</version>
+            </dependency>
 
         </dependencies>
     </dependencyManagement>
index 4714c07..3e00d93 100644 (file)
                   <name>messagebus-app-impl</name>
                   <type xmlns:binding-impl="urn:opendaylight:params:xml:ns:yang:controller:messagebus:app:impl">binding-impl:messagebus-app-impl</type>
                   <binding-broker xmlns="urn:opendaylight:params:xml:ns:yang:controller:messagebus:app:impl">
-                      <type xmlns:md-sal-binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">md-sal-binding:binding-broker-osgi-registry</type>
+                      <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">prefix:binding-broker-osgi-registry</type>
                       <name>binding-osgi-broker</name>
                   </binding-broker>
-                  <dom-broker xmlns="urn:opendaylight:params:xml:ns:yang:controller:messagebus:app:impl">
-                      <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:dom-broker-osgi-registry</type>
-                      <name>dom-broker</name>
-                  </dom-broker>
-                  <namespace-to-stream>
-                      <urn-prefix>urn:ietf:params:xml:ns:yang:smiv2</urn-prefix>
-                      <stream-name>SNMP</stream-name>
-                  </namespace-to-stream>
-                  <namespace-to-stream>
-                      <urn-prefix>urn:ietf:params:xml:ns:yang:ietf-syslog-notification</urn-prefix>
-                      <stream-name>SYSLOG</stream-name>
-                  </namespace-to-stream>
               </module>
           </modules>
           <services xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
index 2970568..108a295 100644 (file)
@@ -46,17 +46,15 @@ and is available at http://www.eclipse.org/legal/epl-v10.html
         <dependency>\r
             <groupId>org.opendaylight.controller</groupId>\r
             <artifactId>messagebus-api</artifactId>\r
-            <version>1.3.0-SNAPSHOT</version>\r
+        </dependency>\r
+        <dependency>\r
+            <groupId>org.opendaylight.controller</groupId>\r
+            <artifactId>messagebus-util</artifactId>\r
         </dependency>\r
         <dependency>\r
             <groupId>org.opendaylight.controller</groupId>\r
             <artifactId>messagebus-spi</artifactId>\r
-            <version>1.3.0-SNAPSHOT</version>\r
         </dependency>\r
-        <!--<dependency>-->\r
-            <!--<groupId>org.opendaylight.controller</groupId>-->\r
-            <!--<artifactId>sal-netconf-connector</artifactId>-->\r
-        <!--</dependency>-->\r
         <dependency>\r
             <groupId>org.opendaylight.controller</groupId>\r
             <artifactId>sal-binding-config</artifactId>\r
index 10dd9ea..a4e5514 100644 (file)
@@ -7,29 +7,17 @@
  */
 package org.opendaylight.controller.config.yang.messagebus.app.impl;
 
-import java.util.HashSet;
-import java.util.Set;
-
 import org.opendaylight.controller.config.api.DependencyResolver;
 import org.opendaylight.controller.config.api.ModuleIdentifier;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.MountPointService;
-import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
-import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
 import org.opendaylight.controller.messagebus.app.impl.EventSourceTopology;
-//import org.opendaylight.controller.messagebus.eventsources.netconf.NetconfEventSourceManager;
-import org.opendaylight.controller.messagebus.spi.EventSource;
-import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
-import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
+import org.opendaylight.controller.messagebus.app.util.Providers;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
-import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
 import org.osgi.framework.BundleContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-
 public class MessageBusAppImplModule extends org.opendaylight.controller.config.yang.messagebus.app.impl.AbstractMessageBusAppImplModule {
     private static final Logger LOGGER = LoggerFactory.getLogger(MessageBusAppImplModule.class);
 
@@ -58,55 +46,12 @@ public class MessageBusAppImplModule extends org.opendaylight.controller.config.
 
     @Override
     public java.lang.AutoCloseable createInstance() {
-
         final ProviderContext bindingCtx = getBindingBrokerDependency().registerProvider(new Providers.BindingAware());
-        final ProviderSession domCtx = getDomBrokerDependency().registerProvider(new Providers.BindingIndependent());
         final DataBroker dataBroker = bindingCtx.getSALService(DataBroker.class);
-        final DOMNotificationPublishService domPublish = domCtx.getService(DOMNotificationPublishService.class);
-        final DOMMountPointService domMount = domCtx.getService(DOMMountPointService.class);
         final RpcProviderRegistry rpcRegistry = bindingCtx.getSALService(RpcProviderRegistry.class);
-        final MountPointService mountPointService = bindingCtx.getSALService(MountPointService.class);
-        final EventSourceRegistryWrapper eventSourceRegistryWrapper = new EventSourceRegistryWrapper(new EventSourceTopology(dataBroker, rpcRegistry));
-//        final NetconfEventSourceManager netconfEventSourceManager
-//                = NetconfEventSourceManager.create(dataBroker,
-//                        domPublish,
-//                        domMount,
-//                        mountPointService,
-//                        eventSourceRegistryWrapper,
-//                        getNamespaceToStream());
-//        eventSourceRegistryWrapper.addAutoCloseable(netconfEventSourceManager);
+        final EventSourceTopology eventSourceTopology = new EventSourceTopology(dataBroker, rpcRegistry);
         LOGGER.info("Messagebus initialized");
-        return eventSourceRegistryWrapper;
-
+        return eventSourceTopology;
     }
 
-    //TODO: separate NetconfEventSource into separate bundle, remove this wrapper, return EventSourceTopology directly as EventSourceRegistry
-    private class EventSourceRegistryWrapper implements EventSourceRegistry{
-
-        private final EventSourceRegistry baseEventSourceRegistry;
-        private final Set<AutoCloseable> autoCloseables = new HashSet<>();
-
-        public EventSourceRegistryWrapper(EventSourceRegistry baseEventSourceRegistry) {
-            this.baseEventSourceRegistry = baseEventSourceRegistry;
-        }
-
-        public void addAutoCloseable(AutoCloseable ac){
-            Preconditions.checkNotNull(ac);
-            autoCloseables.add(ac);
-        }
-
-        @Override
-        public void close() throws Exception {
-            for(AutoCloseable ac : autoCloseables){
-                ac.close();
-            }
-            baseEventSourceRegistry.close();
-        }
-
-        @Override
-        public <T extends EventSource> EventSourceRegistration<T> registerEventSource(T eventSource) {
-            return this.baseEventSourceRegistry.registerEventSource(eventSource);
-        }
-
-    }
 }
index 3aa470b..b79d12b 100644 (file)
@@ -16,6 +16,7 @@ import java.util.concurrent.Future;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.messagebus.app.util.Util;
 import org.opendaylight.controller.messagebus.spi.EventSource;
 import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
 import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java
deleted file mode 100644 (file)
index 5eb32d6..0000000
+++ /dev/null
@@ -1,340 +0,0 @@
-///*
-// * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
-// *
-// * This program and the accompanying materials are made available under the
-// * terms of the Eclipse Public License v1.0 which accompanies this distribution,
-// * and is available at http://www.eclipse.org/legal/epl-v10.html
-// */
-//
-//package org.opendaylight.controller.messagebus.eventsources.netconf;
-//
-//import static com.google.common.util.concurrent.Futures.immediateFuture;
-//
-//import java.io.IOException;
-//import java.util.ArrayList;
-//import java.util.Date;
-//import java.util.HashMap;
-//import java.util.List;
-//import java.util.Map;
-//import java.util.Set;
-//import java.util.concurrent.Future;
-//import java.util.regex.Pattern;
-//
-//import javax.xml.stream.XMLStreamException;
-//import javax.xml.transform.dom.DOMResult;
-//import javax.xml.transform.dom.DOMSource;
-//
-//import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-//import org.opendaylight.controller.md.sal.binding.api.MountPoint;
-//import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-//import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-//import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-//import org.opendaylight.controller.md.sal.dom.api.DOMEvent;
-//import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
-//import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
-//import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
-//import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
-//import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
-//import org.opendaylight.controller.messagebus.app.impl.TopicDOMNotification;
-//import org.opendaylight.controller.messagebus.app.impl.Util;
-//import org.opendaylight.controller.messagebus.spi.EventSource;
-//import org.opendaylight.controller.config.util.xml.XmlUtil;
-//import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
-//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
-//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
-//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicNotification;
-//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
-//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput;
-//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutputBuilder;
-//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicStatus;
-//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInput;
-//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf;
-//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
-//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
-//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
-//import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-//import org.opendaylight.yangtools.yang.common.QName;
-//import org.opendaylight.yangtools.yang.common.RpcResult;
-//import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-//import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
-//import org.opendaylight.yangtools.yang.data.api.schema.AnyXmlNode;
-//import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-//import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
-//import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-//import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
-//import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-//import org.opendaylight.yangtools.yang.model.api.SchemaPath;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//import org.w3c.dom.Document;
-//import org.w3c.dom.Element;
-//
-//import com.google.common.base.Optional;
-//import com.google.common.base.Preconditions;
-//import com.google.common.base.Throwables;
-//import com.google.common.util.concurrent.CheckedFuture;
-//
-//public class NetconfEventSource implements EventSource, DOMNotificationListener {
-//
-//    private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSource.class);
-//
-//    private static final NodeIdentifier TOPIC_NOTIFICATION_ARG = new NodeIdentifier(TopicNotification.QNAME);
-//    private static final NodeIdentifier EVENT_SOURCE_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "node-id"));
-//    private static final NodeIdentifier TOPIC_ID_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "topic-id"));
-//    private static final NodeIdentifier PAYLOAD_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "payload"));
-//    private static final String ConnectionNotificationSourceName = "ConnectionNotificationSource";
-//
-//    private final String nodeId;
-//    private final Node node;
-//
-//    private final DOMMountPoint netconfMount;
-//    private final MountPoint mountPoint;
-//    private final DOMNotificationPublishService domPublish;
-//
-//    private final Map<String, String> urnPrefixToStreamMap; // key = urnPrefix, value = StreamName
-//    private final List<NotificationTopicRegistration> notificationTopicRegistrationList = new ArrayList<>();
-//
-//    public NetconfEventSource(final Node node, final Map<String, String> streamMap, final DOMMountPoint netconfMount, final MountPoint mountPoint, final DOMNotificationPublishService publishService) {
-//        this.netconfMount = Preconditions.checkNotNull(netconfMount);
-//        this.mountPoint = Preconditions.checkNotNull(mountPoint);
-//        this.node = Preconditions.checkNotNull(node);
-//        this.urnPrefixToStreamMap = Preconditions.checkNotNull(streamMap);
-//        this.domPublish = Preconditions.checkNotNull(publishService);
-//        this.nodeId = node.getNodeId().getValue();
-//        this.initializeNotificationTopicRegistrationList();
-//
-//        LOG.info("NetconfEventSource [{}] created.", this.nodeId);
-//    }
-//
-//    private void initializeNotificationTopicRegistrationList() {
-//        notificationTopicRegistrationList.add(new ConnectionNotificationTopicRegistration(ConnectionNotificationSourceName, this));
-//        Optional<Map<String, Stream>> streamMap = getAvailableStreams();
-//        if(streamMap.isPresent()){
-//            LOG.debug("Stream configuration compare...");
-//            for (String urnPrefix : this.urnPrefixToStreamMap.keySet()) {
-//                final String streamName = this.urnPrefixToStreamMap.get(urnPrefix);
-//                LOG.debug("urnPrefix: {} streamName: {}", urnPrefix, streamName);
-//                if(streamMap.get().containsKey(streamName)){
-//                    LOG.debug("Stream containig on device");
-//                    notificationTopicRegistrationList.add(new StreamNotificationTopicRegistration(streamMap.get().get(streamName),urnPrefix, this));
-//                }
-//            }
-//        }
-//    }
-//
-//    private Optional<Map<String, Stream>> getAvailableStreams(){
-//
-//        Map<String,Stream> streamMap = null;
-//        InstanceIdentifier<Streams> pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build();
-//        Optional<DataBroker> dataBroker = this.mountPoint.getService(DataBroker.class);
-//
-//        if(dataBroker.isPresent()){
-//            LOG.debug("GET Available streams ...");
-//            ReadOnlyTransaction tx = dataBroker.get().newReadOnlyTransaction();
-//            CheckedFuture<Optional<Streams>, ReadFailedException> checkFeature = tx.read(LogicalDatastoreType.OPERATIONAL,pathStream);
-//
-//            try {
-//                Optional<Streams> streams = checkFeature.checkedGet();
-//                if(streams.isPresent()){
-//                    streamMap = new HashMap<>();
-//                    for(Stream stream : streams.get().getStream()){
-//                        LOG.debug("*** find stream {}", stream.getName().getValue());
-//                        streamMap.put(stream.getName().getValue(), stream);
-//                    }
-//                }
-//            } catch (ReadFailedException e) {
-//                LOG.warn("Can not read streams for node {}",this.nodeId);
-//            }
-//
-//        } else {
-//            LOG.warn("No databroker on node {}", this.nodeId);
-//        }
-//
-//        return Optional.fromNullable(streamMap);
-//    }
-//
-//    @Override
-//    public Future<RpcResult<JoinTopicOutput>> joinTopic(final JoinTopicInput input) {
-//        LOG.debug("Join topic {} on {}", input.getTopicId().getValue(), this.nodeId);
-//        final NotificationPattern notificationPattern = input.getNotificationPattern();
-//        final List<SchemaPath> matchingNotifications = getMatchingNotifications(notificationPattern);
-//        return registerTopic(input.getTopicId(),matchingNotifications);
-//
-//    }
-//
-//    @Override
-//    public Future<RpcResult<Void>> disJoinTopic(DisJoinTopicInput input) {
-//         for(NotificationTopicRegistration reg : notificationTopicRegistrationList){
-//             reg.unRegisterNotificationTopic(input.getTopicId());
-//         }
-//        return Util.resultRpcSuccessFor((Void) null) ;
-//    }
-//
-//    private synchronized Future<RpcResult<JoinTopicOutput>> registerTopic(final TopicId topicId, final List<SchemaPath> notificationsToSubscribe){
-//        LOG.debug("Join topic {} - register");
-//        JoinTopicStatus joinTopicStatus = JoinTopicStatus.Down;
-//        if(notificationsToSubscribe != null && notificationsToSubscribe.isEmpty() == false){
-//            LOG.debug("Notifications to subscribe has found - count {}",notificationsToSubscribe.size() );
-//            final Optional<DOMNotificationService> notifyService = getDOMMountPoint().getService(DOMNotificationService.class);
-//            if(notifyService.isPresent()){
-//                int registeredNotificationCount = 0;
-//                for(SchemaPath schemaNotification : notificationsToSubscribe){
-//                   for(NotificationTopicRegistration reg : notificationTopicRegistrationList){
-//                       LOG.debug("Try notification registratio {} on SchemaPathNotification {}", reg.getSourceName(), schemaNotification.getLastComponent().getLocalName());
-//                       if(reg.checkNotificationPath(schemaNotification)){
-//                           LOG.info("Source of notification {} is activating, TopicId {}", reg.getSourceName(), topicId.getValue() );
-//                           boolean regSuccess = reg.registerNotificationTopic(schemaNotification, topicId);
-//                           if(regSuccess){
-//                              registeredNotificationCount = registeredNotificationCount +1;
-//                           }
-//                       }
-//                   }
-//                }
-//                if(registeredNotificationCount > 0){
-//                    joinTopicStatus = JoinTopicStatus.Up;
-//                }
-//            } else {
-//                LOG.warn("NO DOMNotification service on node {}", this.nodeId);
-//            }
-//        } else {
-//            LOG.debug("Notifications to subscribe has NOT found");
-//        }
-//
-//        final JoinTopicOutput output = new JoinTopicOutputBuilder().setStatus(joinTopicStatus).build();
-//        return immediateFuture(RpcResultBuilder.success(output).build());
-//
-//    }
-//
-//    public void reActivateStreams(){
-//        for (NotificationTopicRegistration reg : notificationTopicRegistrationList) {
-//           LOG.info("Source of notification {} is reactivating on node {}", reg.getSourceName(), this.nodeId);
-//            reg.reActivateNotificationSource();
-//        }
-//    }
-//
-//    public void deActivateStreams(){
-//        for (NotificationTopicRegistration reg : notificationTopicRegistrationList) {
-//           LOG.info("Source of notification {} is deactivating on node {}", reg.getSourceName(), this.nodeId);
-//           reg.deActivateNotificationSource();
-//        }
-//    }
-//
-//    @Override
-//    public void onNotification(final DOMNotification notification) {
-//        SchemaPath notificationPath = notification.getType();
-//        Date notificationEventTime = null;
-//        if(notification instanceof DOMEvent){
-//            notificationEventTime = ((DOMEvent) notification).getEventTime();
-//        }
-//        for(NotificationTopicRegistration notifReg : notificationTopicRegistrationList){
-//            ArrayList<TopicId> topicIdsForNotification = notifReg.getNotificationTopicIds(notificationPath);
-//            if(topicIdsForNotification != null && topicIdsForNotification.isEmpty() == false){
-//
-//                if(notifReg instanceof StreamNotificationTopicRegistration){
-//                    StreamNotificationTopicRegistration streamReg = (StreamNotificationTopicRegistration)notifReg;
-//                    streamReg.setLastEventTime(notificationEventTime);
-//                }
-//
-//                for(TopicId topicId : topicIdsForNotification){
-//                    publishNotification(notification, topicId);
-//                    LOG.debug("Notification {} has been published for TopicId {}",notification.getType(), topicId.getValue());
-//                }
-//
-//            }
-//        }
-//    }
-//
-//    private void publishNotification(final DOMNotification notification, TopicId topicId){
-//         final ContainerNode topicNotification = Builders.containerBuilder()
-//                 .withNodeIdentifier(TOPIC_NOTIFICATION_ARG)
-//                 .withChild(ImmutableNodes.leafNode(TOPIC_ID_ARG, topicId))
-//                 .withChild(ImmutableNodes.leafNode(EVENT_SOURCE_ARG, this.nodeId))
-//                 .withChild(encapsulate(notification))
-//                 .build();
-//         try {
-//             domPublish.putNotification(new TopicDOMNotification(topicNotification));
-//         } catch (final InterruptedException e) {
-//             throw Throwables.propagate(e);
-//         }
-//    }
-//
-//    private AnyXmlNode encapsulate(final DOMNotification body) {
-//        // FIXME: Introduce something like AnyXmlWithNormalizedNodeData in Yangtools
-//        final Document doc = XmlUtil.newDocument();
-//        final Optional<String> namespace = Optional.of(PAYLOAD_ARG.getNodeType().getNamespace().toString());
-//        final Element element = XmlUtil.createElement(doc , "payload", namespace);
-//
-//        final DOMResult result = new DOMResult(element);
-//
-//        final SchemaContext context = getDOMMountPoint().getSchemaContext();
-//        final SchemaPath schemaPath = body.getType();
-//        try {
-//            NetconfMessageTransformUtil.writeNormalizedNode(body.getBody(), result, schemaPath, context);
-//            return Builders.anyXmlBuilder().withNodeIdentifier(PAYLOAD_ARG)
-//                    .withValue(new DOMSource(element))
-//                    .build();
-//        } catch (IOException | XMLStreamException e) {
-//            LOG.error("Unable to encapsulate notification.",e);
-//            throw Throwables.propagate(e);
-//        }
-//    }
-//
-//    private List<SchemaPath> getMatchingNotifications(NotificationPattern notificationPattern){
-//        // FIXME: default language should already be regex
-//        final String regex = Util.wildcardToRegex(notificationPattern.getValue());
-//
-//        final Pattern pattern = Pattern.compile(regex);
-//        List<SchemaPath> availableNotifications = getAvailableNotifications();
-//        if(availableNotifications == null || availableNotifications.isEmpty()){
-//            return null;
-//        }
-//        return Util.expandQname(availableNotifications, pattern);
-//    }
-//
-//    @Override
-//    public void close() throws Exception {
-//        for(NotificationTopicRegistration streamReg : notificationTopicRegistrationList){
-//            streamReg.close();
-//        }
-//    }
-//
-//    @Override
-//    public NodeKey getSourceNodeKey(){
-//        return getNode().getKey();
-//    }
-//
-//    @Override
-//    public List<SchemaPath> getAvailableNotifications() {
-//
-//        final List<SchemaPath> availNotifList = new ArrayList<>();
-//        // add Event Source Connection status notification
-//        availNotifList.add(ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH);
-//
-//        // FIXME: use SchemaContextListener to get changes asynchronously
-//        final Set<NotificationDefinition> availableNotifications = getDOMMountPoint().getSchemaContext().getNotifications();
-//        // add all known notifications from netconf device
-//        for (final NotificationDefinition nd : availableNotifications) {
-//            availNotifList.add(nd.getPath());
-//        }
-//        return availNotifList;
-//    }
-//
-//    public Node getNode() {
-//        return node;
-//    }
-//
-//    DOMMountPoint getDOMMountPoint() {
-//        return netconfMount;
-//    }
-//
-//    MountPoint getMountPoint() {
-//        return mountPoint;
-//    }
-//
-//    NetconfNode getNetconfNode(){
-//        return node.getAugmentation(NetconfNode.class);
-//    }
-//
-//}
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManager.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManager.java
deleted file mode 100644 (file)
index 8106d90..0000000
+++ /dev/null
@@ -1,213 +0,0 @@
-///*
-// * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
-// *
-// * This program and the accompanying materials are made available under the
-// * terms of the Eclipse Public License v1.0 which accompanies this distribution,
-// * and is available at http://www.eclipse.org/legal/epl-v10.html
-// */
-//
-//package org.opendaylight.controller.messagebus.eventsources.netconf;
-//
-//import java.util.HashMap;
-//import java.util.List;
-//import java.util.Map;
-//import java.util.concurrent.ConcurrentHashMap;
-//
-//import org.opendaylight.controller.config.yang.messagebus.app.impl.NamespaceToStream;
-//import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-//import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
-//import org.opendaylight.controller.md.sal.binding.api.MountPointService;
-//import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-//import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-//import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-//import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
-//import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
-//import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
-//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
-//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
-//import org.opendaylight.yangtools.concepts.ListenerRegistration;
-//import org.opendaylight.yangtools.yang.binding.DataObject;
-//import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//
-//import com.google.common.base.Preconditions;
-//
-//public final class NetconfEventSourceManager implements DataChangeListener, AutoCloseable {
-//
-//    private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSourceManager.class);
-//    private static final TopologyKey NETCONF_TOPOLOGY_KEY = new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName()));
-//    private static final InstanceIdentifier<Node> NETCONF_DEVICE_PATH = InstanceIdentifier.create(NetworkTopology.class)
-//                .child(Topology.class, NETCONF_TOPOLOGY_KEY)
-//                .child(Node.class);
-//
-//    private final Map<String, String> streamMap;
-//    private final ConcurrentHashMap<InstanceIdentifier<?>, NetconfEventSourceRegistration> registrationMap = new ConcurrentHashMap<>();
-//    private final DOMNotificationPublishService publishService;
-//    private final DOMMountPointService domMounts;
-//    private final MountPointService mountPointService;
-//    private ListenerRegistration<DataChangeListener> listenerRegistration;
-//    private final EventSourceRegistry eventSourceRegistry;
-//
-//    public static NetconfEventSourceManager create(final DataBroker dataBroker,
-//            final DOMNotificationPublishService domPublish,
-//            final DOMMountPointService domMount,
-//            final MountPointService bindingMount,
-//            final EventSourceRegistry eventSourceRegistry,
-//            final List<NamespaceToStream> namespaceMapping){
-//
-//        final NetconfEventSourceManager eventSourceManager =
-//                new NetconfEventSourceManager(domPublish, domMount,bindingMount, eventSourceRegistry, namespaceMapping);
-//
-//        eventSourceManager.initialize(dataBroker);
-//
-//        return eventSourceManager;
-//
-//    }
-//
-//    private NetconfEventSourceManager(final DOMNotificationPublishService domPublish,
-//                              final DOMMountPointService domMount,
-//                              final MountPointService bindingMount,
-//                              final EventSourceRegistry eventSourceRegistry,
-//                              final List<NamespaceToStream> namespaceMapping) {
-//
-//        Preconditions.checkNotNull(domPublish);
-//        Preconditions.checkNotNull(domMount);
-//        Preconditions.checkNotNull(bindingMount);
-//        Preconditions.checkNotNull(eventSourceRegistry);
-//        Preconditions.checkNotNull(namespaceMapping);
-//        this.streamMap = namespaceToStreamMapping(namespaceMapping);
-//        this.domMounts = domMount;
-//        this.mountPointService = bindingMount;
-//        this.publishService = domPublish;
-//        this.eventSourceRegistry = eventSourceRegistry;
-//    }
-//
-//    private void initialize(final DataBroker dataBroker){
-//        Preconditions.checkNotNull(dataBroker);
-//        listenerRegistration = dataBroker.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, NETCONF_DEVICE_PATH, this, DataChangeScope.SUBTREE);
-//        LOG.info("NetconfEventSourceManager initialized.");
-//    }
-//
-//    private Map<String,String> namespaceToStreamMapping(final List<NamespaceToStream> namespaceMapping) {
-//        final Map<String, String> streamMap = new HashMap<>(namespaceMapping.size());
-//
-//        for (final NamespaceToStream nToS  : namespaceMapping) {
-//            streamMap.put(nToS.getUrnPrefix(), nToS.getStreamName());
-//        }
-//
-//        return streamMap;
-//    }
-//
-//    @Override
-//    public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> event) {
-//
-//        LOG.debug("[DataChangeEvent<InstanceIdentifier<?>, DataObject>: {}]", event);
-//        for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getCreatedData().entrySet()) {
-//            if (changeEntry.getValue() instanceof Node) {
-//                nodeCreated(changeEntry.getKey(),(Node) changeEntry.getValue());
-//            }
-//        }
-//
-//        for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getUpdatedData().entrySet()) {
-//            if (changeEntry.getValue() instanceof Node) {
-//                nodeUpdated(changeEntry.getKey(),(Node) changeEntry.getValue());
-//            }
-//        }
-//
-//        for(InstanceIdentifier<?> removePath : event.getRemovedPaths()){
-//            DataObject removeObject = event.getOriginalData().get(removePath);
-//            if(removeObject instanceof Node){
-//                nodeRemoved(removePath);
-//            }
-//        }
-//
-//    }
-//
-//    private void nodeCreated(final InstanceIdentifier<?> key, final Node node){
-//        Preconditions.checkNotNull(key);
-//        if(validateNode(node) == false){
-//            LOG.warn("NodeCreated event : Node [{}] is null or not valid.", key.toString());
-//            return;
-//        }
-//        LOG.info("Netconf event source [{}] is creating...", key.toString());
-//        NetconfEventSourceRegistration nesr = NetconfEventSourceRegistration.create(key, node, this);
-//        if(nesr != null){
-//            NetconfEventSourceRegistration nesrOld = registrationMap.put(key, nesr);
-//            if(nesrOld != null){
-//                nesrOld.close();
-//            }
-//        }
-//    }
-//
-//    private void nodeUpdated(final InstanceIdentifier<?> key, final Node node){
-//        Preconditions.checkNotNull(key);
-//        if(validateNode(node) == false){
-//            LOG.warn("NodeUpdated event : Node [{}] is null or not valid.", key.toString());
-//            return;
-//        }
-//
-//        LOG.info("Netconf event source [{}] is updating...", key.toString());
-//        NetconfEventSourceRegistration nesr = registrationMap.get(key);
-//        if(nesr != null){
-//            nesr.updateStatus();
-//        } else {
-//            nodeCreated(key, node);
-//        }
-//    }
-//
-//    private void nodeRemoved(final InstanceIdentifier<?> key){
-//        Preconditions.checkNotNull(key);
-//        LOG.info("Netconf event source [{}] is removing...", key.toString());
-//        NetconfEventSourceRegistration nesr = registrationMap.remove(key);
-//        if(nesr != null){
-//            nesr.close();
-//        }
-//    }
-//
-//    private boolean validateNode(final Node node){
-//        if(node == null){
-//            return false;
-//        }
-//        return isNetconfNode(node);
-//    }
-//
-//    Map<String, String> getStreamMap() {
-//        return streamMap;
-//    }
-//
-//    DOMNotificationPublishService getPublishService() {
-//        return publishService;
-//    }
-//
-//    DOMMountPointService getDomMounts() {
-//        return domMounts;
-//    }
-//
-//    EventSourceRegistry getEventSourceRegistry() {
-//        return eventSourceRegistry;
-//    }
-//
-//    MountPointService getMountPointService() {
-//        return mountPointService;
-//    }
-//
-//    private boolean isNetconfNode(final Node node)  {
-//        return node.getAugmentation(NetconfNode.class) != null ;
-//    }
-//
-//    @Override
-//    public void close() {
-//        listenerRegistration.close();
-//        for(final NetconfEventSourceRegistration reg : registrationMap.values()){
-//            reg.close();
-//        }
-//        registrationMap.clear();
-//    }
-//
-//}
\ No newline at end of file
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceRegistration.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceRegistration.java
deleted file mode 100644 (file)
index 9bc0a46..0000000
+++ /dev/null
@@ -1,196 +0,0 @@
-///*
-// * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
-// *
-// * This program and the accompanying materials are made available under the
-// * terms of the Eclipse Public License v1.0 which accompanies this distribution,
-// * and is available at http://www.eclipse.org/legal/epl-v10.html
-// */
-//package org.opendaylight.controller.messagebus.eventsources.netconf;
-//
-//import java.util.List;
-//
-//import org.opendaylight.controller.md.sal.binding.api.MountPoint;
-//import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
-//import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
-//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
-//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus;
-//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
-//import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-//import org.opendaylight.yangtools.yang.common.QName;
-//import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//
-//import com.google.common.base.Optional;
-//import com.google.common.base.Preconditions;
-//
-///**
-// * Helper class to keep connection status of netconf node  and event source registration object
-// *
-// */
-//public class NetconfEventSourceRegistration implements AutoCloseable{
-//
-//    private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSourceRegistration.class);
-//    private static final YangInstanceIdentifier NETCONF_DEVICE_DOM_PATH = YangInstanceIdentifier.builder()
-//            .node(NetworkTopology.QNAME)
-//            .node(Topology.QNAME)
-//            .nodeWithKey(Topology.QNAME, QName.create(Topology.QNAME, "topology-id"),TopologyNetconf.QNAME.getLocalName())
-//            .node(Node.QNAME)
-//            .build();
-//    private static final QName NODE_ID_QNAME = QName.create(Node.QNAME,"node-id");
-//    private static final String NotificationCapabilityPrefix = "(urn:ietf:params:xml:ns:netconf:notification";
-//
-//    private final Node node;
-//    private final InstanceIdentifier<?> instanceIdent;
-//    private final NetconfEventSourceManager netconfEventSourceManager;
-//    private ConnectionStatus currentNetconfConnStatus;
-//    private EventSourceRegistration<NetconfEventSource> eventSourceRegistration;
-//
-//    public static NetconfEventSourceRegistration create(final InstanceIdentifier<?> instanceIdent, final Node node,
-//                final NetconfEventSourceManager netconfEventSourceManager){
-//        Preconditions.checkNotNull(instanceIdent);
-//        Preconditions.checkNotNull(node);
-//        Preconditions.checkNotNull(netconfEventSourceManager);
-//        if(isEventSource(node) == false){
-//            return null;
-//        }
-//        NetconfEventSourceRegistration nesr = new NetconfEventSourceRegistration(instanceIdent, node, netconfEventSourceManager);
-//        nesr.updateStatus();
-//        LOG.debug("NetconfEventSourceRegistration for node {} has been initialized...",node.getNodeId().getValue());
-//        return nesr;
-//    }
-//
-//    private static boolean isEventSource(final Node node) {
-//        final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
-//        if(netconfNode == null){
-//            return false;
-//        }
-//        if (netconfNode.getAvailableCapabilities() == null) {
-//            return false;
-//        }
-//        final List<String> capabilities = netconfNode.getAvailableCapabilities().getAvailableCapability();
-//        if(capabilities == null || capabilities.isEmpty()) {
-//             return false;
-//        }
-//        for (final String capability : netconfNode.getAvailableCapabilities().getAvailableCapability()) {
-//            if(capability.startsWith(NotificationCapabilityPrefix)) {
-//                return true;
-//            }
-//        }
-//
-//        return false;
-//    }
-//
-//    private NetconfEventSourceRegistration(final InstanceIdentifier<?> instanceIdent, final Node node, final NetconfEventSourceManager netconfEventSourceManager) {
-//        this.instanceIdent = instanceIdent;
-//        this.node = node;
-//        this.netconfEventSourceManager = netconfEventSourceManager;
-//        this.eventSourceRegistration =null;
-//    }
-//
-//    public Node getNode() {
-//        return node;
-//    }
-//
-//    Optional<EventSourceRegistration<NetconfEventSource>> getEventSourceRegistration() {
-//        return Optional.fromNullable(eventSourceRegistration);
-//    }
-//
-//    NetconfNode getNetconfNode(){
-//        return node.getAugmentation(NetconfNode.class);
-//    }
-//
-//    void updateStatus(){
-//        ConnectionStatus netconfConnStatus = getNetconfNode().getConnectionStatus();
-//        LOG.info("Change status on node {}, new status is {}",this.node.getNodeId().getValue(),netconfConnStatus);
-//        if(netconfConnStatus.equals(currentNetconfConnStatus)){
-//            return;
-//        }
-//        changeStatus(netconfConnStatus);
-//    }
-//
-//    private boolean checkConnectionStatusType(ConnectionStatus status){
-//        if(    status == ConnectionStatus.Connected
-//            || status == ConnectionStatus.Connecting
-//            || status == ConnectionStatus.UnableToConnect){
-//            return true;
-//        }
-//        return false;
-//    }
-//
-//    private void changeStatus(ConnectionStatus newStatus){
-//        Preconditions.checkNotNull(newStatus);
-//        if(checkConnectionStatusType(newStatus) == false){
-//            throw new IllegalStateException("Unknown new Netconf Connection Status");
-//        }
-//        if(this.currentNetconfConnStatus == null){
-//            if (newStatus == ConnectionStatus.Connected){
-//                registrationEventSource();
-//            }
-//        } else if (this.currentNetconfConnStatus == ConnectionStatus.Connecting){
-//            if (newStatus == ConnectionStatus.Connected){
-//                if(this.eventSourceRegistration == null){
-//                    registrationEventSource();
-//                } else {
-//                    // reactivate stream on registered event source (invoke publish notification about connection)
-//                    this.eventSourceRegistration.getInstance().reActivateStreams();
-//                }
-//            }
-//        } else if (this.currentNetconfConnStatus == ConnectionStatus.Connected) {
-//
-//            if(newStatus == ConnectionStatus.Connecting || newStatus == ConnectionStatus.UnableToConnect){
-//                // deactivate streams on registered event source (invoke publish notification about connection)
-//                this.eventSourceRegistration.getInstance().deActivateStreams();
-//            }
-//        } else if (this.currentNetconfConnStatus == ConnectionStatus.UnableToConnect){
-//            if(newStatus == ConnectionStatus.Connected){
-//                if(this.eventSourceRegistration == null){
-//                    registrationEventSource();
-//                } else {
-//                    // reactivate stream on registered event source (invoke publish notification about connection)
-//                    this.eventSourceRegistration.getInstance().reActivateStreams();
-//                }
-//            }
-//        } else {
-//            throw new IllegalStateException("Unknown current Netconf Connection Status");
-//        }
-//        this.currentNetconfConnStatus = newStatus;
-//    }
-//
-//    private void registrationEventSource(){
-//        final Optional<MountPoint> mountPoint = netconfEventSourceManager.getMountPointService().getMountPoint(instanceIdent);
-//        final Optional<DOMMountPoint> domMountPoint = netconfEventSourceManager.getDomMounts().getMountPoint(domMountPath(node.getNodeId()));
-//        EventSourceRegistration<NetconfEventSource> registration = null;
-//        if(domMountPoint.isPresent() && mountPoint.isPresent()) {
-//            final NetconfEventSource netconfEventSource = new NetconfEventSource(
-//                    node,
-//                    netconfEventSourceManager.getStreamMap(),
-//                    domMountPoint.get(),
-//                    mountPoint.get(),
-//                    netconfEventSourceManager.getPublishService());
-//            registration = netconfEventSourceManager.getEventSourceRegistry().registerEventSource(netconfEventSource);
-//            LOG.info("Event source {} has been registered",node.getNodeId().getValue());
-//        }
-//        this.eventSourceRegistration = registration;
-//      }
-//
-//    private YangInstanceIdentifier domMountPath(final NodeId nodeId) {
-//        return YangInstanceIdentifier.builder(NETCONF_DEVICE_DOM_PATH).nodeWithKey(Node.QNAME, NODE_ID_QNAME, nodeId.getValue()).build();
-//    }
-//
-//    private void closeEventSourceRegistration(){
-//        if(getEventSourceRegistration().isPresent()){
-//            getEventSourceRegistration().get().close();
-//        }
-//    }
-//
-//    @Override
-//    public void close() {
-//        closeEventSourceRegistration();
-//    }
-//
-//}
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java
deleted file mode 100644 (file)
index 64ddb31..0000000
+++ /dev/null
@@ -1,204 +0,0 @@
-///*
-// * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
-// *
-// * This program and the accompanying materials are made available under the
-// * terms of the Eclipse Public License v1.0 which accompanies this distribution,
-// * and is available at http://www.eclipse.org/legal/epl-v10.html
-// */
-//package org.opendaylight.controller.messagebus.eventsources.netconf;
-//
-//import java.util.ArrayList;
-//import java.util.Date;
-//import java.util.List;
-//import java.util.concurrent.ConcurrentHashMap;
-//
-//import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
-//import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
-//import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
-//import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
-//import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
-//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
-//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
-//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
-//import org.opendaylight.yangtools.concepts.ListenerRegistration;
-//import org.opendaylight.yangtools.yang.common.QName;
-//import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
-//import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-//import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
-//import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-//import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeAttrBuilder;
-//import org.opendaylight.yangtools.yang.model.api.SchemaPath;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//
-//import com.google.common.base.Optional;
-//import com.google.common.util.concurrent.CheckedFuture;
-//
-//public class StreamNotificationTopicRegistration extends NotificationTopicRegistration {
-//
-//    private static final Logger LOG = LoggerFactory.getLogger(StreamNotificationTopicRegistration.class);
-//    private static final NodeIdentifier STREAM_QNAME = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"stream"));
-//    private static final SchemaPath CREATE_SUBSCRIPTION = SchemaPath.create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription"));
-//    private static final NodeIdentifier START_TIME_SUBSCRIPTION = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"startTime"));
-//
-//    final private DOMMountPoint domMountPoint;
-//    final private String nodeId;
-//    final private NetconfEventSource netconfEventSource;
-//    final private Stream stream;
-//    private Date lastEventTime;
-//
-//    private ConcurrentHashMap<SchemaPath, ListenerRegistration<NetconfEventSource>> notificationRegistrationMap = new ConcurrentHashMap<>();
-//    private ConcurrentHashMap<SchemaPath, ArrayList<TopicId>> notificationTopicMap = new ConcurrentHashMap<>();
-//
-//    public StreamNotificationTopicRegistration(final Stream stream, final String notificationPrefix, NetconfEventSource netconfEventSource) {
-//        super(NotificationSourceType.NetconfDeviceStream, stream.getName().getValue(), notificationPrefix);
-//        this.domMountPoint = netconfEventSource.getDOMMountPoint();
-//        this.nodeId = netconfEventSource.getNode().getNodeId().getValue().toString();
-//        this.netconfEventSource = netconfEventSource;
-//        this.stream = stream;
-//        this.lastEventTime= null;
-//        setReplaySupported(this.stream.isReplaySupport());
-//        setActive(false);
-//        LOG.info("StreamNotificationTopicRegistration initialized for {}", getStreamName());
-//    }
-//
-//    void activateNotificationSource() {
-//        if(isActive() == false){
-//            LOG.info("Stream {} is not active on node {}. Will subscribe.", this.getStreamName(), this.nodeId);
-//            final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME))
-//                    .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName()))
-//                    .build();
-//            CheckedFuture<DOMRpcResult, DOMRpcException> csFuture = domMountPoint.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input);
-//            try {
-//                csFuture.checkedGet();
-//                setActive(true);
-//            } catch (DOMRpcException e) {
-//                LOG.warn("Can not subscribe stream {} on node {}", this.getSourceName(), this.nodeId);
-//                setActive(false);
-//                return;
-//            }
-//        } else {
-//            LOG.info("Stream {} is now active on node {}", this.getStreamName(), this.nodeId);
-//        }
-//    }
-//
-//    void reActivateNotificationSource(){
-//        if(isActive()){
-//            LOG.info("Stream {} is reactivating on node {}.", this.getStreamName(), this.nodeId);
-//            DataContainerNodeAttrBuilder<NodeIdentifier, ContainerNode> inputBuilder =
-//                    Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME))
-//                    .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName()));
-//            if(isReplaySupported() && this.getLastEventTime() != null){
-//                inputBuilder.withChild(ImmutableNodes.leafNode(START_TIME_SUBSCRIPTION, this.getLastEventTime()));
-//            }
-//            final ContainerNode input = inputBuilder.build();
-//            CheckedFuture<DOMRpcResult, DOMRpcException> csFuture = domMountPoint.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input);
-//            try {
-//                csFuture.checkedGet();
-//                setActive(true);
-//            } catch (DOMRpcException e) {
-//                LOG.warn("Can not resubscribe stream {} on node {}", this.getSourceName(), this.nodeId);
-//                setActive(false);
-//                return;
-//            }
-//        }
-//    }
-//
-//    @Override
-//    void deActivateNotificationSource() {
-//        // no operations need
-//    }
-//
-//    private void closeStream() {
-//        if(isActive()){
-//            for(ListenerRegistration<NetconfEventSource> reg : notificationRegistrationMap.values()){
-//                reg.close();
-//            }
-//            notificationRegistrationMap.clear();
-//            notificationTopicMap.clear();
-//            setActive(false);
-//        }
-//    }
-//
-//    private String getStreamName() {
-//        return getSourceName();
-//    }
-//
-//    @Override
-//    ArrayList<TopicId> getNotificationTopicIds(SchemaPath notificationPath){
-//        return notificationTopicMap.get(notificationPath);
-//    }
-//
-//    @Override
-//    boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId){
-//
-//        if(checkNotificationPath(notificationPath) == false){
-//            LOG.debug("Bad SchemaPath for notification try to register");
-//            return false;
-//        }
-//
-//        final Optional<DOMNotificationService> notifyService = domMountPoint.getService(DOMNotificationService.class);
-//        if(notifyService.isPresent() == false){
-//            LOG.debug("DOMNotificationService is not present");
-//            return false;
-//        }
-//
-//        activateNotificationSource();
-//        if(isActive() == false){
-//            LOG.warn("Stream {} is not active, listener for notification {} is not registered.", getStreamName(), notificationPath.toString());
-//            return false;
-//        }
-//
-//        ListenerRegistration<NetconfEventSource> registration =
-//                notifyService.get().registerNotificationListener(this.netconfEventSource,notificationPath);
-//        notificationRegistrationMap.put(notificationPath, registration);
-//        ArrayList<TopicId> topicIds = getNotificationTopicIds(notificationPath);
-//        if(topicIds == null){
-//            topicIds = new ArrayList<>();
-//            topicIds.add(topicId);
-//        } else {
-//            if(topicIds.contains(topicId) == false){
-//                topicIds.add(topicId);
-//            }
-//        }
-//
-//        notificationTopicMap.put(notificationPath, topicIds);
-//        return true;
-//    }
-//
-//    @Override
-//    synchronized void unRegisterNotificationTopic(TopicId topicId) {
-//        List<SchemaPath> notificationPathToRemove = new ArrayList<>();
-//        for(SchemaPath notifKey : notificationTopicMap.keySet()){
-//            ArrayList<TopicId> topicList = notificationTopicMap.get(notifKey);
-//            if(topicList != null){
-//                topicList.remove(topicId);
-//                if(topicList.isEmpty()){
-//                    notificationPathToRemove.add(notifKey);
-//                }
-//            }
-//        }
-//        for(SchemaPath notifKey : notificationPathToRemove){
-//            notificationTopicMap.remove(notifKey);
-//            ListenerRegistration<NetconfEventSource> reg = notificationRegistrationMap.remove(notifKey);
-//            if(reg != null){
-//                reg.close();
-//            }
-//        }
-//    }
-//
-//    Optional<Date> getLastEventTime() {
-//        return Optional.fromNullable(lastEventTime);
-//    }
-//
-//
-//    void setLastEventTime(Date lastEventTime) {
-//        this.lastEventTime = lastEventTime;
-//    }
-//
-//    @Override
-//    public void close() throws Exception {
-//        closeStream();
-//    }
-//
-//}
index 320afcc..01c1ba2 100644 (file)
@@ -10,7 +10,7 @@ module messagebus-app-impl {
 
     description
         "Service definition for Message Bus application implementation.";
+
     revision "2015-02-03" {
         description "Second revision. Message Bus opensourcing";
     }
@@ -24,7 +24,7 @@ module messagebus-app-impl {
     augment "/config:modules/config:module/config:configuration" {
         case messagebus-app-impl {
             when "/config:modules/config:module/config:type = 'messagebus-app-impl'";
-            
+
             container binding-broker {
                 uses config:service-ref {
                     refine type {
@@ -34,32 +34,7 @@ module messagebus-app-impl {
                 }
             }
 
-            container dom-broker {
-                uses config:service-ref {
-                    refine type {
-                        mandatory true;
-                        config:required-identity dom:dom-broker-osgi-registry;
-                    }
-                }
-            }
-
-            list namespace-to-stream {
-                key urn-prefix;
-
-                leaf urn-prefix {
-                    type string;
-                }
-
-                leaf stream-name {
-                    type string;
-                }
-            }
-        }
-    }
-    
-    augment "/config:modules/config:module/config:state" {
-        case messagebus-app-impl {
-            when "/config:modules/config:module/config:type = 'messagebus-app-impl'";
         }
     }
+
 }
\ No newline at end of file
diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManagerTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManagerTest.java
deleted file mode 100644 (file)
index 2ae7de2..0000000
+++ /dev/null
@@ -1,177 +0,0 @@
-///*
-// * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
-// *
-// * This program and the accompanying materials are made available under the
-// * terms of the Eclipse Public License v1.0 which accompanies this distribution,
-// * and is available at http://www.eclipse.org/legal/epl-v10.html
-// */
-//package org.opendaylight.controller.messagebus.eventsources.netconf;
-//
-//import static org.mockito.Matchers.any;
-//import static org.mockito.Matchers.eq;
-//import static org.mockito.Matchers.notNull;
-//import static org.mockito.Mockito.doReturn;
-//import static org.mockito.Mockito.mock;
-//import static org.mockito.Mockito.times;
-//import static org.mockito.Mockito.verify;
-//
-//import java.util.ArrayList;
-//import java.util.HashMap;
-//import java.util.List;
-//import java.util.Map;
-//
-//import org.junit.Before;
-//import org.junit.BeforeClass;
-//import org.junit.Test;
-//import org.opendaylight.controller.config.yang.messagebus.app.impl.NamespaceToStream;
-//import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-//import org.opendaylight.controller.md.sal.binding.api.MountPoint;
-//import org.opendaylight.controller.md.sal.binding.api.MountPointService;
-//import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-//import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-//import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-//import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-//import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-//import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
-//import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
-//import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
-//import org.opendaylight.controller.messagebus.app.impl.EventSourceTopology;
-//import org.opendaylight.controller.messagebus.spi.EventSource;
-//import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
-//import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
-//import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
-//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf;
-//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
-//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
-//import org.opendaylight.yangtools.concepts.ListenerRegistration;
-//import org.opendaylight.yangtools.yang.binding.DataObject;
-//import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-//import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-//
-//import com.google.common.base.Optional;
-//import com.google.common.util.concurrent.CheckedFuture;
-//
-//public class NetconfEventSourceManagerTest {
-//
-//    NetconfEventSourceManager netconfEventSourceManager;
-//    ListenerRegistration listenerRegistrationMock;
-//    DOMMountPointService domMountPointServiceMock;
-//    MountPointService mountPointServiceMock;
-//    EventSourceTopology eventSourceTopologyMock;
-//    AsyncDataChangeEvent asyncDataChangeEventMock;
-//    RpcProviderRegistry rpcProviderRegistryMock;
-//    EventSourceRegistry eventSourceRegistry;
-//    @BeforeClass
-//    public static void initTestClass() throws IllegalAccessException, InstantiationException {
-//    }
-//
-//    @Before
-//    public void setUp() throws Exception {
-//        DataBroker dataBrokerMock = mock(DataBroker.class);
-//        DOMNotificationPublishService domNotificationPublishServiceMock = mock(DOMNotificationPublishService.class);
-//        domMountPointServiceMock = mock(DOMMountPointService.class);
-//        mountPointServiceMock = mock(MountPointService.class);
-//        eventSourceTopologyMock = mock(EventSourceTopology.class);
-//        rpcProviderRegistryMock = mock(RpcProviderRegistry.class);
-//        eventSourceRegistry = mock(EventSourceRegistry.class);
-//        List<NamespaceToStream> namespaceToStreamList = new ArrayList<>();
-//
-//        listenerRegistrationMock = mock(ListenerRegistration.class);
-//        doReturn(listenerRegistrationMock).when(dataBrokerMock).registerDataChangeListener(eq(LogicalDatastoreType.OPERATIONAL), any(InstanceIdentifier.class), any(NetconfEventSourceManager.class), eq(AsyncDataBroker.DataChangeScope.SUBTREE));
-//
-//        Optional<DOMMountPoint> optionalDomMountServiceMock = (Optional<DOMMountPoint>) mock(Optional.class);
-//        doReturn(true).when(optionalDomMountServiceMock).isPresent();
-//        doReturn(optionalDomMountServiceMock).when(domMountPointServiceMock).getMountPoint((YangInstanceIdentifier)notNull());
-//
-//        DOMMountPoint domMountPointMock = mock(DOMMountPoint.class);
-//        doReturn(domMountPointMock).when(optionalDomMountServiceMock).get();
-//
-//
-//        Optional optionalBindingMountMock = mock(Optional.class);
-//        doReturn(true).when(optionalBindingMountMock).isPresent();
-//
-//        MountPoint mountPointMock = mock(MountPoint.class);
-//        doReturn(optionalBindingMountMock).when(mountPointServiceMock).getMountPoint(any(InstanceIdentifier.class));
-//        doReturn(mountPointMock).when(optionalBindingMountMock).get();
-//
-//        Optional optionalMpDataBroker = mock(Optional.class);
-//        DataBroker mpDataBroker = mock(DataBroker.class);
-//        doReturn(optionalMpDataBroker).when(mountPointMock).getService(DataBroker.class);
-//        doReturn(true).when(optionalMpDataBroker).isPresent();
-//        doReturn(mpDataBroker).when(optionalMpDataBroker).get();
-//
-//        ReadOnlyTransaction rtx = mock(ReadOnlyTransaction.class);
-//        doReturn(rtx).when(mpDataBroker).newReadOnlyTransaction();
-//        CheckedFuture<Optional<Streams>, ReadFailedException> checkFeature = (CheckedFuture<Optional<Streams>, ReadFailedException>)mock(CheckedFuture.class);
-//        InstanceIdentifier<Streams> pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build();
-//        doReturn(checkFeature).when(rtx).read(LogicalDatastoreType.OPERATIONAL, pathStream);
-//        Optional<Streams> avStreams = NetconfTestUtils.getAvailableStream("stream01", true);
-//        doReturn(avStreams).when(checkFeature).checkedGet();
-//
-//        EventSourceRegistration esrMock = mock(EventSourceRegistration.class);
-//
-//        netconfEventSourceManager =
-//                NetconfEventSourceManager.create(dataBrokerMock,
-//                        domNotificationPublishServiceMock,
-//                        domMountPointServiceMock,
-//                        mountPointServiceMock,
-//                        eventSourceRegistry,
-//                        namespaceToStreamList);
-//    }
-//
-//    @Test
-//    public void onDataChangedCreateEventSourceTestByCreateEntry() throws Exception {
-//        onDataChangedTestHelper(true,false,true,NetconfTestUtils.notification_capability_prefix);
-//        netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
-//        verify(eventSourceRegistry, times(1)).registerEventSource(any(EventSource.class));
-//    }
-//
-//    @Test
-//    public void onDataChangedCreateEventSourceTestByUpdateEntry() throws Exception {
-//        onDataChangedTestHelper(false,true,true, NetconfTestUtils.notification_capability_prefix);
-//        netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
-//        verify(eventSourceRegistry, times(1)).registerEventSource(any(EventSource.class));
-//    }
-//
-//    @Test
-//    public void onDataChangedCreateEventSourceTestNotNeconf() throws Exception {
-//        onDataChangedTestHelper(false,true,false,NetconfTestUtils.notification_capability_prefix);
-//        netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
-//        verify(eventSourceRegistry, times(0)).registerEventSource(any(EventSource.class));
-//    }
-//
-//    @Test
-//    public void onDataChangedCreateEventSourceTestNotNotificationCapability() throws Exception {
-//        onDataChangedTestHelper(true,false,true,"bad-prefix");
-//        netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
-//        verify(eventSourceRegistry, times(0)).registerEventSource(any(EventSource.class));
-//    }
-//
-//    private void onDataChangedTestHelper(boolean create, boolean update, boolean isNetconf, String notificationCapabilityPrefix) throws Exception{
-//        asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class);
-//        Map<InstanceIdentifier, DataObject> mapCreate = new HashMap<>();
-//        Map<InstanceIdentifier, DataObject> mapUpdate = new HashMap<>();
-//
-//        Node node01;
-//        String nodeId = "Node01";
-//        doReturn(mapCreate).when(asyncDataChangeEventMock).getCreatedData();
-//        doReturn(mapUpdate).when(asyncDataChangeEventMock).getUpdatedData();
-//
-//        if(isNetconf){
-//            node01 = NetconfTestUtils.getNetconfNode(nodeId, "node01.test.local", ConnectionStatus.Connected, notificationCapabilityPrefix);
-//
-//        } else {
-//            node01 = NetconfTestUtils.getNode(nodeId);
-//        }
-//
-//        if(create){
-//            mapCreate.put(NetconfTestUtils.getInstanceIdentifier(node01), node01);
-//        }
-//        if(update){
-//            mapUpdate.put(NetconfTestUtils.getInstanceIdentifier(node01), node01);
-//        }
-//
-//    }
-//
-//}
\ No newline at end of file
diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceTest.java
deleted file mode 100644 (file)
index 88d7d4f..0000000
+++ /dev/null
@@ -1,141 +0,0 @@
-///*
-// * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
-// *
-// * This program and the accompanying materials are made available under the
-// * terms of the Eclipse Public License v1.0 which accompanies this distribution,
-// * and is available at http://www.eclipse.org/legal/epl-v10.html
-// */
-//package org.opendaylight.controller.messagebus.eventsources.netconf;
-//
-//
-//import static org.junit.Assert.assertNotNull;
-//import static org.mockito.Matchers.any;
-//import static org.mockito.Mockito.doReturn;
-//import static org.mockito.Mockito.mock;
-//
-//import java.net.URI;
-//import java.util.HashMap;
-//import java.util.HashSet;
-//import java.util.Map;
-//import java.util.Set;
-//
-//import org.junit.Before;
-//import org.junit.Test;
-//import org.opendaylight.controller.md.sal.binding.api.BindingService;
-//import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-//import org.opendaylight.controller.md.sal.binding.api.MountPoint;
-//import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-//import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-//import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-//import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
-//import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
-//import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
-//import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
-//import org.opendaylight.controller.md.sal.dom.api.DOMService;
-//import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
-//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
-//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
-//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
-//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
-//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf;
-//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
-//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus;
-//import org.opendaylight.yangtools.concepts.ListenerRegistration;
-//import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-//import org.opendaylight.yangtools.yang.common.QName;
-//import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-//import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
-//import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-//import org.opendaylight.yangtools.yang.model.api.SchemaPath;
-//
-//import com.google.common.base.Optional;
-//import com.google.common.util.concurrent.CheckedFuture;
-//
-//public class NetconfEventSourceTest {
-//
-//    NetconfEventSource netconfEventSource;
-//    DOMMountPoint domMountPointMock;
-//    MountPoint mountPointMock;
-//    JoinTopicInput joinTopicInputMock;
-//
-//    @Before
-//    public void setUp() throws Exception {
-//        Map<String, String> streamMap = new HashMap<>();
-//        streamMap.put("uriStr1", "string2");
-//        domMountPointMock = mock(DOMMountPoint.class);
-//        mountPointMock = mock(MountPoint.class);
-//        DOMNotificationPublishService domNotificationPublishServiceMock = mock(DOMNotificationPublishService.class);
-//        RpcConsumerRegistry rpcConsumerRegistryMock = mock(RpcConsumerRegistry.class);
-//        Optional<BindingService> onlyOptionalMock = (Optional<BindingService>) mock(Optional.class);
-//        NotificationsService notificationsServiceMock = mock(NotificationsService.class);
-//        doReturn(notificationsServiceMock).when(rpcConsumerRegistryMock).getRpcService(NotificationsService.class);
-//
-//        Optional<DataBroker> optionalMpDataBroker = (Optional<DataBroker>) mock(Optional.class);
-//        DataBroker mpDataBroker = mock(DataBroker.class);
-//        doReturn(optionalMpDataBroker).when(mountPointMock).getService(DataBroker.class);
-//        doReturn(true).when(optionalMpDataBroker).isPresent();
-//        doReturn(mpDataBroker).when(optionalMpDataBroker).get();
-//
-//        ReadOnlyTransaction rtx = mock(ReadOnlyTransaction.class);
-//        doReturn(rtx).when(mpDataBroker).newReadOnlyTransaction();
-//        CheckedFuture<Optional<Streams>, ReadFailedException> checkFeature = (CheckedFuture<Optional<Streams>, ReadFailedException>)mock(CheckedFuture.class);
-//        InstanceIdentifier<Streams> pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build();
-//        doReturn(checkFeature).when(rtx).read(LogicalDatastoreType.OPERATIONAL, pathStream);
-//        Optional<Streams> avStreams = NetconfTestUtils.getAvailableStream("stream01", true);
-//        doReturn(avStreams).when(checkFeature).checkedGet();
-//
-//        netconfEventSource = new NetconfEventSource(
-//                NetconfTestUtils.getNetconfNode("NodeId1", "node.test.local", ConnectionStatus.Connected, NetconfTestUtils.notification_capability_prefix),
-//                streamMap,
-//                domMountPointMock,
-//                mountPointMock ,
-//                domNotificationPublishServiceMock);
-//
-//    }
-//
-//    @Test
-//    public void joinTopicTest() throws Exception{
-//        joinTopicTestHelper();
-//        assertNotNull("JoinTopic return value has not been created correctly.", netconfEventSource.joinTopic(joinTopicInputMock));
-//    }
-//
-//    private void joinTopicTestHelper() throws Exception{
-//        joinTopicInputMock = mock(JoinTopicInput.class);
-//        TopicId topicId = new TopicId("topicID007");
-//        doReturn(topicId).when(joinTopicInputMock).getTopicId();
-//        NotificationPattern notificationPatternMock = mock(NotificationPattern.class);
-//        doReturn(notificationPatternMock).when(joinTopicInputMock).getNotificationPattern();
-//        doReturn("uriStr1").when(notificationPatternMock).getValue();
-//
-//        SchemaContext schemaContextMock = mock(SchemaContext.class);
-//        doReturn(schemaContextMock).when(domMountPointMock).getSchemaContext();
-//        Set<NotificationDefinition> notificationDefinitionSet = new HashSet<>();
-//        NotificationDefinition notificationDefinitionMock = mock(NotificationDefinition.class);
-//        notificationDefinitionSet.add(notificationDefinitionMock);
-//
-//        URI uri = new URI("uriStr1");
-//        QName qName = new QName(uri, "localName1");
-//        org.opendaylight.yangtools.yang.model.api.SchemaPath schemaPath = SchemaPath.create(true, qName);
-//        doReturn(notificationDefinitionSet).when(schemaContextMock).getNotifications();
-//        doReturn(schemaPath).when(notificationDefinitionMock).getPath();
-//
-//        Optional<DOMNotificationService> domNotificationServiceOptionalMock = (Optional<DOMNotificationService>) mock(Optional.class);
-//        doReturn(domNotificationServiceOptionalMock).when(domMountPointMock).getService(DOMNotificationService.class);
-//        doReturn(true).when(domNotificationServiceOptionalMock).isPresent();
-//
-//        DOMNotificationService domNotificationServiceMock = mock(DOMNotificationService.class);
-//        doReturn(domNotificationServiceMock).when(domNotificationServiceOptionalMock).get();
-//        ListenerRegistration<NetconfEventSource> listenerRegistrationMock = (ListenerRegistration<NetconfEventSource>)mock(ListenerRegistration.class);
-//        doReturn(listenerRegistrationMock).when(domNotificationServiceMock).registerNotificationListener(any(NetconfEventSource.class), any(SchemaPath.class));
-//
-//        Optional<DOMService> optionalMock = (Optional<DOMService>) mock(Optional.class);
-//        doReturn(optionalMock).when(domMountPointMock).getService(DOMRpcService.class);
-//        doReturn(true).when(optionalMock).isPresent();
-//        DOMRpcService domRpcServiceMock = mock(DOMRpcService.class);
-//        doReturn(domRpcServiceMock).when(optionalMock).get();
-//        CheckedFuture checkedFutureMock = mock(CheckedFuture.class);
-//        doReturn(checkedFutureMock).when(domRpcServiceMock).invokeRpc(any(SchemaPath.class), any(ContainerNode.class));
-//
-//    }
-//
-//}
\ No newline at end of file
diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfTestUtils.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfTestUtils.java
deleted file mode 100644 (file)
index b92dce4..0000000
+++ /dev/null
@@ -1,95 +0,0 @@
-///*
-// * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
-// *
-// * This program and the accompanying materials are made available under the
-// * terms of the Eclipse Public License v1.0 which accompanies this distribution,
-// * and is available at http://www.eclipse.org/legal/epl-v10.html
-// */
-//package org.opendaylight.controller.messagebus.eventsources.netconf;
-//
-//import java.util.ArrayList;
-//import java.util.List;
-//
-//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
-//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
-//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.StreamsBuilder;
-//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
-//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamBuilder;
-//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.DomainName;
-//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host;
-//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
-//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
-//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus;
-//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.AvailableCapabilities;
-//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.AvailableCapabilitiesBuilder;
-//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
-//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
-//import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-//
-//import com.google.common.base.Optional;
-//
-//public final class NetconfTestUtils {
-//
-//    public static final String notification_capability_prefix = "(urn:ietf:params:xml:ns:netconf:notification";
-//
-//    private NetconfTestUtils() {
-//    }
-//
-//    public static Node getNetconfNode(String nodeIdent,String hostName,ConnectionStatus cs, String notificationCapabilityPrefix){
-//
-//        DomainName dn = new DomainName(hostName);
-//        Host host = new Host(dn);
-//
-//        List<String> avCapList = new ArrayList<>();
-//        avCapList.add(notificationCapabilityPrefix +"_availableCapabilityString1");
-//        AvailableCapabilities avCaps = new AvailableCapabilitiesBuilder().setAvailableCapability(avCapList).build();
-//        NetconfNode nn = new NetconfNodeBuilder()
-//                .setConnectionStatus(cs)
-//                .setHost(host)
-//                .setAvailableCapabilities(avCaps)
-//                .build();
-//
-//        NodeId nodeId = new NodeId(nodeIdent);
-//        NodeKey nk = new NodeKey(nodeId);
-//        NodeBuilder nb = new NodeBuilder();
-//        nb.setKey(nk);
-//
-//        nb.addAugmentation(NetconfNode.class, nn);
-//        return nb.build();
-//    }
-//
-//    public static Node getNode(String nodeIdent){
-//         NodeId nodeId = new NodeId(nodeIdent);
-//         NodeKey nk = new NodeKey(nodeId);
-//         NodeBuilder nb = new NodeBuilder();
-//         nb.setKey(nk);
-//         return nb.build();
-//    }
-//
-//    public static InstanceIdentifier<Node> getInstanceIdentifier(Node node){
-//        TopologyKey NETCONF_TOPOLOGY_KEY = new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName()));
-//        InstanceIdentifier<Node> nodeII = InstanceIdentifier.create(NetworkTopology.class)
-//                    .child(Topology.class, NETCONF_TOPOLOGY_KEY)
-//                    .child(Node.class, node.getKey());
-//        return nodeII;
-//    }
-//
-//    public static Optional<Streams> getAvailableStream(String Name, boolean replaySupport){
-//        Stream stream = new StreamBuilder()
-//                .setName(new StreamNameType(Name))
-//                .setReplaySupport(replaySupport)
-//                .build();
-//        List<Stream> streamList = new ArrayList<>();
-//        streamList.add(stream);
-//        Streams streams = new StreamsBuilder().setStream(streamList).build();
-//        return Optional.of(streams);
-//    }
-//
-//}
diff --git a/opendaylight/md-sal/messagebus-util/pom.xml b/opendaylight/md-sal/messagebus-util/pom.xml
new file mode 100644 (file)
index 0000000..6cc572e
--- /dev/null
@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>\r
+<!--\r
+Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.\r
+\r
+This program and the accompanying materials are made available under the\r
+terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
+and is available at http://www.eclipse.org/legal/epl-v10.html\r
+-->\r
+<project xmlns="http://maven.apache.org/POM/4.0.0"\r
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"\r
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">\r
+    <modelVersion>4.0.0</modelVersion>\r
+\r
+    <parent>\r
+        <groupId>org.opendaylight.controller</groupId>\r
+        <artifactId>sal-parent</artifactId>\r
+        <version>1.3.0-SNAPSHOT</version>\r
+    </parent>\r
+\r
+    <artifactId>messagebus-util</artifactId>\r
+    <name>${project.artifactId}</name>\r
+\r
+    <packaging>bundle</packaging>\r
+\r
+    <dependencies>\r
+        <dependency>\r
+            <groupId>org.opendaylight.controller</groupId>\r
+            <artifactId>sal-core-api</artifactId>\r
+        </dependency>\r
+        <dependency>\r
+            <groupId>org.opendaylight.controller</groupId>\r
+            <artifactId>sal-binding-api</artifactId>\r
+        </dependency>\r
+        <dependency>\r
+            <groupId>org.opendaylight.controller</groupId>\r
+            <artifactId>messagebus-api</artifactId>\r
+        </dependency>\r
+        <!-- Testing Dependencies -->\r
+        <dependency>\r
+              <groupId>junit</groupId>\r
+              <artifactId>junit</artifactId>\r
+              <scope>test</scope>\r
+        </dependency>\r
+        <dependency>\r
+              <groupId>org.glassfish.jersey.test-framework.providers</groupId>\r
+              <artifactId>jersey-test-framework-provider-grizzly2</artifactId>\r
+              <scope>test</scope>\r
+        </dependency>\r
+        <dependency>\r
+              <groupId>org.mockito</groupId>\r
+              <artifactId>mockito-all</artifactId>\r
+              <scope>test</scope>\r
+        </dependency>\r
+    </dependencies>\r
+</project>\r
@@ -5,7 +5,7 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.messagebus.app.impl;
+package org.opendaylight.controller.messagebus.app.util;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
index daa1333..920b066 100644 (file)
@@ -85,6 +85,7 @@
     <module>messagebus-api</module>
     <module>messagebus-spi</module>
     <module>messagebus-impl</module>
+    <module>messagebus-util</module>
     <module>messagebus-config</module>
 
     <!-- PAX EXAM ITs -->
index 70cbb4e..963cd0e 100644 (file)
         <feature version='${project.version}'>odl-netconf-connector-ssh</feature>
     </feature>
 
+    <feature name='odl-message-bus' version='${project.version}'>
+        <!-- messagebus endpoint for netconf connector-->
+        <feature version='${project.version}'>odl-netconf-connector-all</feature>
+        <feature version='${mdsal.version}'>odl-message-bus-collector</feature>
+        <bundle>mvn:org.opendaylight.controller/messagebus-netconf/${netconf.version}</bundle>
+        <configfile finalname="${config.configfile.directory}/06-message-netconf.xml">mvn:org.opendaylight.controller/messagebus-netconf/${netconf.version}/xml/config</configfile>
+    </feature>
+
     <feature name='odl-netconf-connector' version='${project.version}' description="OpenDaylight :: Netconf Connector :: Netconf Connector">
         <feature version='${mdsal.version}'>odl-mdsal-broker</feature>
         <feature version='${netconf.version}'>odl-netconf-client</feature>
diff --git a/opendaylight/netconf/messagebus-netconf/pom.xml b/opendaylight/netconf/messagebus-netconf/pom.xml
new file mode 100644 (file)
index 0000000..847327c
--- /dev/null
@@ -0,0 +1,135 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+
+This program and the accompanying materials are made available under the
+terms of the Eclipse Public License v1.0 which accompanies this distribution,
+and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>netconf-subsystem</artifactId>
+        <version>0.4.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>messagebus-netconf</artifactId>
+    <name>${project.artifactId}</name>
+
+    <packaging>bundle</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>ietf-netconf-notifications</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-binding-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-core-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-common-util</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.yangtools</groupId>
+            <artifactId>yang-data-impl</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>config-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>messagebus-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>messagebus-spi</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>messagebus-util</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-netconf-connector</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-binding-config</artifactId>
+        </dependency>
+
+        <!-- Testing Dependencies -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish.jersey.test-framework.providers</groupId>
+            <artifactId>jersey-test-framework-provider-grizzly2</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.opendaylight.yangtools</groupId>
+                <artifactId>yang-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>${project.build.directory}/generated-sources/config</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>attach-artifacts</id>
+                        <goals>
+                            <goal>attach-artifact</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <artifacts>
+                                <artifact>
+                                    <file>${project.build.directory}/classes/initial/06-message-netconf.xml</file>
+                                    <type>xml</type>
+                                    <classifier>config</classifier>
+                                </artifact>
+                            </artifacts>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/config/yang/messagebus/netconf/MessageBusNetconfModule.java b/opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/config/yang/messagebus/netconf/MessageBusNetconfModule.java
new file mode 100644 (file)
index 0000000..f27518d
--- /dev/null
@@ -0,0 +1,39 @@
+package org.opendaylight.controller.config.yang.messagebus.netconf;
+
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.MountPointService;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.messagebus.app.util.Providers;
+import org.opendaylight.controller.messagebus.eventsources.netconf.NetconfEventSourceManager;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.core.api.Broker;
+
+public class MessageBusNetconfModule extends org.opendaylight.controller.config.yang.messagebus.netconf.AbstractMessageBusNetconfModule {
+    public MessageBusNetconfModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
+        super(identifier, dependencyResolver);
+    }
+
+    public MessageBusNetconfModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, org.opendaylight.controller.config.yang.messagebus.netconf.MessageBusNetconfModule oldModule, java.lang.AutoCloseable oldInstance) {
+        super(identifier, dependencyResolver, oldModule, oldInstance);
+    }
+
+    @Override
+    public void customValidation() {}
+
+    @Override
+    public java.lang.AutoCloseable createInstance() {
+        final BindingAwareBroker.ProviderContext bindingCtx = getBindingBrokerDependency().registerProvider(new Providers.BindingAware());
+        final Broker.ProviderSession domCtx = getDomBrokerDependency().registerProvider(new Providers.BindingIndependent());
+
+        final MountPointService mountPointService = bindingCtx.getSALService(MountPointService.class);
+        final DataBroker dataBroker = bindingCtx.getSALService(DataBroker.class);
+
+        final DOMNotificationPublishService domPublish = domCtx.getService(DOMNotificationPublishService.class);
+        final DOMMountPointService domMount = domCtx.getService(DOMMountPointService.class);
+
+        return NetconfEventSourceManager.create(dataBroker, domPublish, domMount,
+            mountPointService, getEventSourceRegistryDependency(), getNamespaceToStream());
+    }
+
+}
diff --git a/opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/config/yang/messagebus/netconf/MessageBusNetconfModuleFactory.java b/opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/config/yang/messagebus/netconf/MessageBusNetconfModuleFactory.java
new file mode 100644 (file)
index 0000000..7681cdf
--- /dev/null
@@ -0,0 +1,13 @@
+/*
+* Generated file
+*
+* Generated from: yang module name: messagebus-netconf yang module local name: messagebus-netconf
+* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator
+* Generated at: Wed Jul 29 14:15:30 CEST 2015
+*
+* Do not modify this file unless it is present under src/main directory
+*/
+package org.opendaylight.controller.config.yang.messagebus.netconf;
+public class MessageBusNetconfModuleFactory extends org.opendaylight.controller.config.yang.messagebus.netconf.AbstractMessageBusNetconfModuleFactory {
+
+}
@@ -7,15 +7,15 @@
  */
 package org.opendaylight.controller.messagebus.eventsources.netconf;
 
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
-
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
 import javax.xml.parsers.ParserConfigurationException;
 import javax.xml.transform.dom.DOMSource;
-
 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
@@ -33,15 +33,14 @@ import org.slf4j.LoggerFactory;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-
 public class ConnectionNotificationTopicRegistration extends NotificationTopicRegistration {
 
     private static final Logger LOG = LoggerFactory.getLogger(ConnectionNotificationTopicRegistration.class);
 
-    public static final SchemaPath EVENT_SOURCE_STATUS_PATH = SchemaPath.create(true, QName.create(EventSourceStatusNotification.QNAME, "event-source-status"));
-    private static final NodeIdentifier EVENT_SOURCE_STATUS_ARG = new NodeIdentifier(EventSourceStatusNotification.QNAME);
+    public static final SchemaPath EVENT_SOURCE_STATUS_PATH = SchemaPath
+        .create(true, QName.create(EventSourceStatusNotification.QNAME, "event-source-status"));
+    private static final NodeIdentifier EVENT_SOURCE_STATUS_ARG = new NodeIdentifier(
+        EventSourceStatusNotification.QNAME);
     private static final String XMLNS_ATTRIBUTE_KEY = "xmlns";
     private static final String XMLNS_URI = "http://www.w3.org/2000/xmlns/";
 
@@ -49,16 +48,16 @@ public class ConnectionNotificationTopicRegistration extends NotificationTopicRe
     private ConcurrentHashMap<SchemaPath, ArrayList<TopicId>> notificationTopicMap = new ConcurrentHashMap<>();
 
     public ConnectionNotificationTopicRegistration(String SourceName, DOMNotificationListener domNotificationListener) {
-        super(NotificationSourceType.ConnectionStatusChange, SourceName, EVENT_SOURCE_STATUS_PATH.getLastComponent().getNamespace().toString());
+        super(NotificationSourceType.ConnectionStatusChange, SourceName,
+            EVENT_SOURCE_STATUS_PATH.getLastComponent().getNamespace().toString());
         this.domNotificationListener = Preconditions.checkNotNull(domNotificationListener);
         LOG.info("Connection notification source has been initialized.");
         setActive(true);
         setReplaySupported(false);
     }
 
-    @Override
-    public void close() throws Exception {
-        if(isActive()){
+    @Override public void close() throws Exception {
+        if (isActive()) {
             LOG.debug("Connection notification - publish Deactive");
             publishNotification(EventSourceStatus.Deactive);
             notificationTopicMap.clear();
@@ -66,36 +65,32 @@ public class ConnectionNotificationTopicRegistration extends NotificationTopicRe
         }
     }
 
-    @Override
-    void activateNotificationSource() {
+    @Override void activateNotificationSource() {
         LOG.debug("Connection notification - publish Active");
         publishNotification(EventSourceStatus.Active);
     }
 
-    @Override
-    void deActivateNotificationSource() {
+    @Override void deActivateNotificationSource() {
         LOG.debug("Connection notification - publish Inactive");
         publishNotification(EventSourceStatus.Inactive);
     }
 
-    @Override
-    void reActivateNotificationSource() {
+    @Override void reActivateNotificationSource() {
         LOG.debug("Connection notification - reactivate - publish active");
         publishNotification(EventSourceStatus.Active);
     }
 
-    @Override
-    boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId) {
-        if(checkNotificationPath(notificationPath) == false){
+    @Override boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId) {
+        if (checkNotificationPath(notificationPath) == false) {
             LOG.debug("Bad SchemaPath for notification try to register");
             return false;
         }
         ArrayList<TopicId> topicIds = getNotificationTopicIds(notificationPath);
-        if(topicIds == null){
+        if (topicIds == null) {
             topicIds = new ArrayList<>();
             topicIds.add(topicId);
         } else {
-            if(topicIds.contains(topicId) == false){
+            if (topicIds.contains(topicId) == false) {
                 topicIds.add(topicId);
             }
         }
@@ -103,57 +98,50 @@ public class ConnectionNotificationTopicRegistration extends NotificationTopicRe
         return true;
     }
 
-    @Override
-    ArrayList<TopicId> getNotificationTopicIds(SchemaPath notificationPath) {
+    @Override ArrayList<TopicId> getNotificationTopicIds(SchemaPath notificationPath) {
         return notificationTopicMap.get(notificationPath);
     }
 
-    @Override
-    synchronized void unRegisterNotificationTopic(TopicId topicId) {
+    @Override synchronized void unRegisterNotificationTopic(TopicId topicId) {
         List<SchemaPath> notificationPathToRemove = new ArrayList<>();
-        for(SchemaPath notifKey : notificationTopicMap.keySet()){
+        for (SchemaPath notifKey : notificationTopicMap.keySet()) {
             ArrayList<TopicId> topicList = notificationTopicMap.get(notifKey);
-            if(topicList != null){
+            if (topicList != null) {
                 topicList.remove(topicId);
-                if(topicList.isEmpty()){
+                if (topicList.isEmpty()) {
                     notificationPathToRemove.add(notifKey);
                 }
             }
         }
-        for(SchemaPath notifKey : notificationPathToRemove){
+        for (SchemaPath notifKey : notificationPathToRemove) {
             notificationTopicMap.remove(notifKey);
         }
     }
 
-    private void publishNotification(EventSourceStatus eventSourceStatus){
+    private void publishNotification(EventSourceStatus eventSourceStatus) {
 
         final EventSourceStatusNotification notification = new EventSourceStatusNotificationBuilder()
-                    .setStatus(eventSourceStatus)
-                    .build();
+            .setStatus(eventSourceStatus).build();
         domNotificationListener.onNotification(createNotification(notification));
     }
 
-    private DOMNotification createNotification(EventSourceStatusNotification notification){
-        final ContainerNode cn = Builders.containerBuilder()
-                .withNodeIdentifier(EVENT_SOURCE_STATUS_ARG)
-                .withChild(encapsulate(notification))
-                .build();
+    private DOMNotification createNotification(EventSourceStatusNotification notification) {
+        final ContainerNode cn = Builders.containerBuilder().withNodeIdentifier(EVENT_SOURCE_STATUS_ARG)
+            .withChild(encapsulate(notification)).build();
         DOMNotification dn = new DOMNotification() {
 
-            @Override
-            public SchemaPath getType() {
+            @Override public SchemaPath getType() {
                 return EVENT_SOURCE_STATUS_PATH;
             }
 
-            @Override
-            public ContainerNode getBody() {
+            @Override public ContainerNode getBody() {
                 return cn;
             }
         };
         return dn;
     }
 
-    private AnyXmlNode encapsulate(EventSourceStatusNotification notification){
+    private AnyXmlNode encapsulate(EventSourceStatusNotification notification) {
 
         DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
         DocumentBuilder docBuilder;
@@ -167,25 +155,23 @@ public class ConnectionNotificationTopicRegistration extends NotificationTopicRe
         Document doc = docBuilder.newDocument();
 
         final Optional<String> namespace = Optional.of(EVENT_SOURCE_STATUS_ARG.getNodeType().getNamespace().toString());
-        final Element rootElement = createElement(doc , "EventSourceStatusNotification", namespace);
+        final Element rootElement = createElement(doc, "EventSourceStatusNotification", namespace);
 
         final Element sourceElement = doc.createElement("status");
         sourceElement.appendChild(doc.createTextNode(notification.getStatus().name()));
         rootElement.appendChild(sourceElement);
 
-
         return Builders.anyXmlBuilder().withNodeIdentifier(EVENT_SOURCE_STATUS_ARG)
-                     .withValue(new DOMSource(rootElement))
-                     .build();
+            .withValue(new DOMSource(rootElement)).build();
 
     }
 
     // Helper to create root XML element with correct namespace and attribute
     private Element createElement(final Document document, final String qName, final Optional<String> namespaceURI) {
-        if(namespaceURI.isPresent()) {
+        if (namespaceURI.isPresent()) {
             final Element element = document.createElementNS(namespaceURI.get(), qName);
             String name = XMLNS_ATTRIBUTE_KEY;
-            if(element.getPrefix() != null) {
+            if (element.getPrefix() != null) {
                 name += ":" + element.getPrefix();
             }
             element.setAttributeNS(XMLNS_URI, name, namespaceURI.get());
diff --git a/opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java b/opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java
new file mode 100644 (file)
index 0000000..6ef3277
--- /dev/null
@@ -0,0 +1,340 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.messagebus.eventsources.netconf;
+
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.CheckedFuture;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.regex.Pattern;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.transform.dom.DOMResult;
+import javax.xml.transform.dom.DOMSource;
+import org.opendaylight.controller.config.util.xml.XmlUtil;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.MountPoint;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMEvent;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
+import org.opendaylight.controller.messagebus.app.util.TopicDOMNotification;
+import org.opendaylight.controller.messagebus.app.util.Util;
+import org.opendaylight.controller.messagebus.spi.EventSource;
+import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicNotification;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicStatus;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.AnyXmlNode;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+public class NetconfEventSource implements EventSource, DOMNotificationListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSource.class);
+
+    private static final NodeIdentifier TOPIC_NOTIFICATION_ARG = new NodeIdentifier(TopicNotification.QNAME);
+    private static final NodeIdentifier EVENT_SOURCE_ARG = new NodeIdentifier(
+        QName.create(TopicNotification.QNAME, "node-id"));
+    private static final NodeIdentifier TOPIC_ID_ARG = new NodeIdentifier(
+        QName.create(TopicNotification.QNAME, "topic-id"));
+    private static final NodeIdentifier PAYLOAD_ARG = new NodeIdentifier(
+        QName.create(TopicNotification.QNAME, "payload"));
+    private static final String ConnectionNotificationSourceName = "ConnectionNotificationSource";
+
+    private final String nodeId;
+    private final Node node;
+
+    private final DOMMountPoint netconfMount;
+    private final MountPoint mountPoint;
+    private final DOMNotificationPublishService domPublish;
+
+    private final Map<String, String> urnPrefixToStreamMap; // key = urnPrefix, value = StreamName
+    private final List<NotificationTopicRegistration> notificationTopicRegistrationList = new ArrayList<>();
+
+    public NetconfEventSource(final Node node, final Map<String, String> streamMap, final DOMMountPoint netconfMount,
+        final MountPoint mountPoint, final DOMNotificationPublishService publishService) {
+        this.netconfMount = Preconditions.checkNotNull(netconfMount);
+        this.mountPoint = Preconditions.checkNotNull(mountPoint);
+        this.node = Preconditions.checkNotNull(node);
+        this.urnPrefixToStreamMap = Preconditions.checkNotNull(streamMap);
+        this.domPublish = Preconditions.checkNotNull(publishService);
+        this.nodeId = node.getNodeId().getValue();
+        this.initializeNotificationTopicRegistrationList();
+
+        LOG.info("NetconfEventSource [{}] created.", this.nodeId);
+    }
+
+    private void initializeNotificationTopicRegistrationList() {
+        notificationTopicRegistrationList
+            .add(new ConnectionNotificationTopicRegistration(ConnectionNotificationSourceName, this));
+        Optional<Map<String, Stream>> streamMap = getAvailableStreams();
+        if (streamMap.isPresent()) {
+            LOG.debug("Stream configuration compare...");
+            for (String urnPrefix : this.urnPrefixToStreamMap.keySet()) {
+                final String streamName = this.urnPrefixToStreamMap.get(urnPrefix);
+                LOG.debug("urnPrefix: {} streamName: {}", urnPrefix, streamName);
+                if (streamMap.get().containsKey(streamName)) {
+                    LOG.debug("Stream containig on device");
+                    notificationTopicRegistrationList
+                        .add(new StreamNotificationTopicRegistration(streamMap.get().get(streamName), urnPrefix, this));
+                }
+            }
+        }
+    }
+
+    private Optional<Map<String, Stream>> getAvailableStreams() {
+
+        Map<String, Stream> streamMap = null;
+        InstanceIdentifier<Streams> pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build();
+        Optional<DataBroker> dataBroker = this.mountPoint.getService(DataBroker.class);
+
+        if (dataBroker.isPresent()) {
+            LOG.debug("GET Available streams ...");
+            ReadOnlyTransaction tx = dataBroker.get().newReadOnlyTransaction();
+            CheckedFuture<Optional<Streams>, ReadFailedException> checkFeature = tx
+                .read(LogicalDatastoreType.OPERATIONAL, pathStream);
+
+            try {
+                Optional<Streams> streams = checkFeature.checkedGet();
+                if (streams.isPresent()) {
+                    streamMap = new HashMap<>();
+                    for (Stream stream : streams.get().getStream()) {
+                        LOG.debug("*** find stream {}", stream.getName().getValue());
+                        streamMap.put(stream.getName().getValue(), stream);
+                    }
+                }
+            } catch (ReadFailedException e) {
+                LOG.warn("Can not read streams for node {}", this.nodeId);
+            }
+
+        } else {
+            LOG.warn("No databroker on node {}", this.nodeId);
+        }
+
+        return Optional.fromNullable(streamMap);
+    }
+
+    @Override public Future<RpcResult<JoinTopicOutput>> joinTopic(final JoinTopicInput input) {
+        LOG.debug("Join topic {} on {}", input.getTopicId().getValue(), this.nodeId);
+        final NotificationPattern notificationPattern = input.getNotificationPattern();
+        final List<SchemaPath> matchingNotifications = getMatchingNotifications(notificationPattern);
+        return registerTopic(input.getTopicId(), matchingNotifications);
+
+    }
+
+    @Override public Future<RpcResult<Void>> disJoinTopic(DisJoinTopicInput input) {
+        for (NotificationTopicRegistration reg : notificationTopicRegistrationList) {
+            reg.unRegisterNotificationTopic(input.getTopicId());
+        }
+        return Util.resultRpcSuccessFor((Void) null);
+    }
+
+    private synchronized Future<RpcResult<JoinTopicOutput>> registerTopic(final TopicId topicId,
+        final List<SchemaPath> notificationsToSubscribe) {
+        LOG.debug("Join topic {} - register", topicId);
+        JoinTopicStatus joinTopicStatus = JoinTopicStatus.Down;
+        if (notificationsToSubscribe != null && notificationsToSubscribe.isEmpty() == false) {
+            LOG.debug("Notifications to subscribe has found - count {}", notificationsToSubscribe.size());
+            final Optional<DOMNotificationService> notifyService = getDOMMountPoint()
+                .getService(DOMNotificationService.class);
+            if (notifyService.isPresent()) {
+                int registeredNotificationCount = 0;
+                for (SchemaPath schemaNotification : notificationsToSubscribe) {
+                    for (NotificationTopicRegistration reg : notificationTopicRegistrationList) {
+                        LOG.debug("Try notification registratio {} on SchemaPathNotification {}", reg.getSourceName(),
+                            schemaNotification.getLastComponent().getLocalName());
+                        if (reg.checkNotificationPath(schemaNotification)) {
+                            LOG.info("Source of notification {} is activating, TopicId {}", reg.getSourceName(),
+                                topicId.getValue());
+                            boolean regSuccess = reg.registerNotificationTopic(schemaNotification, topicId);
+                            if (regSuccess) {
+                                registeredNotificationCount = registeredNotificationCount + 1;
+                            }
+                        }
+                    }
+                }
+                if (registeredNotificationCount > 0) {
+                    joinTopicStatus = JoinTopicStatus.Up;
+                }
+            } else {
+                LOG.warn("NO DOMNotification service on node {}", this.nodeId);
+            }
+        } else {
+            LOG.debug("Notifications to subscribe has NOT found");
+        }
+
+        final JoinTopicOutput output = new JoinTopicOutputBuilder().setStatus(joinTopicStatus).build();
+        return immediateFuture(RpcResultBuilder.success(output).build());
+
+    }
+
+    public void reActivateStreams() {
+        for (NotificationTopicRegistration reg : notificationTopicRegistrationList) {
+            LOG.info("Source of notification {} is reactivating on node {}", reg.getSourceName(), this.nodeId);
+            reg.reActivateNotificationSource();
+        }
+    }
+
+    public void deActivateStreams() {
+        for (NotificationTopicRegistration reg : notificationTopicRegistrationList) {
+            LOG.info("Source of notification {} is deactivating on node {}", reg.getSourceName(), this.nodeId);
+            reg.deActivateNotificationSource();
+        }
+    }
+
+    @Override public void onNotification(final DOMNotification notification) {
+        SchemaPath notificationPath = notification.getType();
+        Date notificationEventTime = null;
+        if (notification instanceof DOMEvent) {
+            notificationEventTime = ((DOMEvent) notification).getEventTime();
+        }
+        for (NotificationTopicRegistration notifReg : notificationTopicRegistrationList) {
+            ArrayList<TopicId> topicIdsForNotification = notifReg.getNotificationTopicIds(notificationPath);
+            if (topicIdsForNotification != null && topicIdsForNotification.isEmpty() == false) {
+
+                if (notifReg instanceof StreamNotificationTopicRegistration) {
+                    StreamNotificationTopicRegistration streamReg = (StreamNotificationTopicRegistration) notifReg;
+                    streamReg.setLastEventTime(notificationEventTime);
+                }
+
+                for (TopicId topicId : topicIdsForNotification) {
+                    publishNotification(notification, topicId);
+                    LOG.debug("Notification {} has been published for TopicId {}", notification.getType(),
+                        topicId.getValue());
+                }
+
+            }
+        }
+    }
+
+    private void publishNotification(final DOMNotification notification, TopicId topicId) {
+        final ContainerNode topicNotification = Builders.containerBuilder().withNodeIdentifier(TOPIC_NOTIFICATION_ARG)
+            .withChild(ImmutableNodes.leafNode(TOPIC_ID_ARG, topicId))
+            .withChild(ImmutableNodes.leafNode(EVENT_SOURCE_ARG, this.nodeId)).withChild(encapsulate(notification))
+            .build();
+        try {
+            domPublish.putNotification(new TopicDOMNotification(topicNotification));
+        } catch (final InterruptedException e) {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    private AnyXmlNode encapsulate(final DOMNotification body) {
+        // FIXME: Introduce something like AnyXmlWithNormalizedNodeData in Yangtools
+        final Document doc = XmlUtil.newDocument();
+        final Optional<String> namespace = Optional.of(PAYLOAD_ARG.getNodeType().getNamespace().toString());
+        final Element element = XmlUtil.createElement(doc, "payload", namespace);
+
+        final DOMResult result = new DOMResult(element);
+
+        final SchemaContext context = getDOMMountPoint().getSchemaContext();
+        final SchemaPath schemaPath = body.getType();
+        try {
+            NetconfMessageTransformUtil.writeNormalizedNode(body.getBody(), result, schemaPath, context);
+            return Builders.anyXmlBuilder().withNodeIdentifier(PAYLOAD_ARG).withValue(new DOMSource(element)).build();
+        } catch (IOException | XMLStreamException e) {
+            LOG.error("Unable to encapsulate notification.", e);
+            throw Throwables.propagate(e);
+        }
+    }
+
+    private List<SchemaPath> getMatchingNotifications(NotificationPattern notificationPattern) {
+        // FIXME: default language should already be regex
+        final String regex = Util.wildcardToRegex(notificationPattern.getValue());
+
+        final Pattern pattern = Pattern.compile(regex);
+        List<SchemaPath> availableNotifications = getAvailableNotifications();
+        if (availableNotifications == null || availableNotifications.isEmpty()) {
+            return null;
+        }
+        return Util.expandQname(availableNotifications, pattern);
+    }
+
+    @Override public void close() throws Exception {
+        for (NotificationTopicRegistration streamReg : notificationTopicRegistrationList) {
+            streamReg.close();
+        }
+    }
+
+    @Override public NodeKey getSourceNodeKey() {
+        return getNode().getKey();
+    }
+
+    @Override public List<SchemaPath> getAvailableNotifications() {
+
+        final List<SchemaPath> availNotifList = new ArrayList<>();
+        // add Event Source Connection status notification
+        availNotifList.add(ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH);
+
+        // FIXME: use SchemaContextListener to get changes asynchronously
+        final Set<NotificationDefinition> availableNotifications = getDOMMountPoint().getSchemaContext()
+            .getNotifications();
+        // add all known notifications from netconf device
+        for (final NotificationDefinition nd : availableNotifications) {
+            availNotifList.add(nd.getPath());
+        }
+        return availNotifList;
+    }
+
+    public Node getNode() {
+        return node;
+    }
+
+    DOMMountPoint getDOMMountPoint() {
+        return netconfMount;
+    }
+
+    MountPoint getMountPoint() {
+        return mountPoint;
+    }
+
+    NetconfNode getNetconfNode() {
+        return node.getAugmentation(NetconfNode.class);
+    }
+
+}
diff --git a/opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManager.java b/opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManager.java
new file mode 100644 (file)
index 0000000..738bd88
--- /dev/null
@@ -0,0 +1,207 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.messagebus.eventsources.netconf;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.opendaylight.controller.config.yang.messagebus.netconf.NamespaceToStream;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.MountPointService;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class NetconfEventSourceManager implements DataChangeListener, AutoCloseable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSourceManager.class);
+    private static final TopologyKey NETCONF_TOPOLOGY_KEY = new TopologyKey(
+        new TopologyId(TopologyNetconf.QNAME.getLocalName()));
+    private static final InstanceIdentifier<Node> NETCONF_DEVICE_PATH = InstanceIdentifier.create(NetworkTopology.class)
+        .child(Topology.class, NETCONF_TOPOLOGY_KEY).child(Node.class);
+
+    private final Map<String, String> streamMap;
+    private final ConcurrentHashMap<InstanceIdentifier<?>, NetconfEventSourceRegistration> registrationMap = new ConcurrentHashMap<>();
+    private final DOMNotificationPublishService publishService;
+    private final DOMMountPointService domMounts;
+    private final MountPointService mountPointService;
+    private ListenerRegistration<DataChangeListener> listenerRegistration;
+    private final EventSourceRegistry eventSourceRegistry;
+
+    public static NetconfEventSourceManager create(final DataBroker dataBroker,
+        final DOMNotificationPublishService domPublish, final DOMMountPointService domMount,
+        final MountPointService bindingMount, final EventSourceRegistry eventSourceRegistry,
+        final List<NamespaceToStream> namespaceMapping) {
+
+        final NetconfEventSourceManager eventSourceManager = new NetconfEventSourceManager(domPublish, domMount,
+            bindingMount, eventSourceRegistry, namespaceMapping);
+
+        eventSourceManager.initialize(dataBroker);
+
+        return eventSourceManager;
+
+    }
+
+    private NetconfEventSourceManager(final DOMNotificationPublishService domPublish,
+        final DOMMountPointService domMount, final MountPointService bindingMount,
+        final EventSourceRegistry eventSourceRegistry, final List<NamespaceToStream> namespaceMapping) {
+
+        Preconditions.checkNotNull(domPublish);
+        Preconditions.checkNotNull(domMount);
+        Preconditions.checkNotNull(bindingMount);
+        Preconditions.checkNotNull(eventSourceRegistry);
+        Preconditions.checkNotNull(namespaceMapping);
+        this.streamMap = namespaceToStreamMapping(namespaceMapping);
+        this.domMounts = domMount;
+        this.mountPointService = bindingMount;
+        this.publishService = domPublish;
+        this.eventSourceRegistry = eventSourceRegistry;
+    }
+
+    private void initialize(final DataBroker dataBroker) {
+        Preconditions.checkNotNull(dataBroker);
+        listenerRegistration = dataBroker
+            .registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, NETCONF_DEVICE_PATH, this,
+                DataChangeScope.SUBTREE);
+        LOG.info("NetconfEventSourceManager initialized.");
+    }
+
+    private Map<String, String> namespaceToStreamMapping(final List<NamespaceToStream> namespaceMapping) {
+        final Map<String, String> streamMap = new HashMap<>(namespaceMapping.size());
+
+        for (final NamespaceToStream nToS : namespaceMapping) {
+            streamMap.put(nToS.getUrnPrefix(), nToS.getStreamName());
+        }
+
+        return streamMap;
+    }
+
+    @Override public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> event) {
+
+        LOG.debug("[DataChangeEvent<InstanceIdentifier<?>, DataObject>: {}]", event);
+        for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getCreatedData().entrySet()) {
+            if (changeEntry.getValue() instanceof Node) {
+                nodeCreated(changeEntry.getKey(), (Node) changeEntry.getValue());
+            }
+        }
+
+        for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getUpdatedData().entrySet()) {
+            if (changeEntry.getValue() instanceof Node) {
+                nodeUpdated(changeEntry.getKey(), (Node) changeEntry.getValue());
+            }
+        }
+
+        for (InstanceIdentifier<?> removePath : event.getRemovedPaths()) {
+            DataObject removeObject = event.getOriginalData().get(removePath);
+            if (removeObject instanceof Node) {
+                nodeRemoved(removePath);
+            }
+        }
+
+    }
+
+    private void nodeCreated(final InstanceIdentifier<?> key, final Node node) {
+        Preconditions.checkNotNull(key);
+        if (validateNode(node) == false) {
+            LOG.warn("NodeCreated event : Node [{}] is null or not valid.", key.toString());
+            return;
+        }
+        LOG.info("Netconf event source [{}] is creating...", key.toString());
+        NetconfEventSourceRegistration nesr = NetconfEventSourceRegistration.create(key, node, this);
+        if (nesr != null) {
+            NetconfEventSourceRegistration nesrOld = registrationMap.put(key, nesr);
+            if (nesrOld != null) {
+                nesrOld.close();
+            }
+        }
+    }
+
+    private void nodeUpdated(final InstanceIdentifier<?> key, final Node node) {
+        Preconditions.checkNotNull(key);
+        if (validateNode(node) == false) {
+            LOG.warn("NodeUpdated event : Node [{}] is null or not valid.", key.toString());
+            return;
+        }
+
+        LOG.info("Netconf event source [{}] is updating...", key.toString());
+        NetconfEventSourceRegistration nesr = registrationMap.get(key);
+        if (nesr != null) {
+            nesr.updateStatus();
+        } else {
+            nodeCreated(key, node);
+        }
+    }
+
+    private void nodeRemoved(final InstanceIdentifier<?> key) {
+        Preconditions.checkNotNull(key);
+        LOG.info("Netconf event source [{}] is removing...", key.toString());
+        NetconfEventSourceRegistration nesr = registrationMap.remove(key);
+        if (nesr != null) {
+            nesr.close();
+        }
+    }
+
+    private boolean validateNode(final Node node) {
+        if (node == null) {
+            return false;
+        }
+        return isNetconfNode(node);
+    }
+
+    Map<String, String> getStreamMap() {
+        return streamMap;
+    }
+
+    DOMNotificationPublishService getPublishService() {
+        return publishService;
+    }
+
+    DOMMountPointService getDomMounts() {
+        return domMounts;
+    }
+
+    EventSourceRegistry getEventSourceRegistry() {
+        return eventSourceRegistry;
+    }
+
+    MountPointService getMountPointService() {
+        return mountPointService;
+    }
+
+    private boolean isNetconfNode(final Node node) {
+        return node.getAugmentation(NetconfNode.class) != null;
+    }
+
+    @Override public void close() {
+        listenerRegistration.close();
+        for (final NetconfEventSourceRegistration reg : registrationMap.values()) {
+            reg.close();
+        }
+        registrationMap.clear();
+    }
+
+}
\ No newline at end of file
diff --git a/opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceRegistration.java b/opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceRegistration.java
new file mode 100644 (file)
index 0000000..18cdccc
--- /dev/null
@@ -0,0 +1,191 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.eventsources.netconf;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import java.util.List;
+import org.opendaylight.controller.md.sal.binding.api.MountPoint;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper class to keep connection status of netconf node  and event source registration object
+ */
+public class NetconfEventSourceRegistration implements AutoCloseable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSourceRegistration.class);
+    private static final YangInstanceIdentifier NETCONF_DEVICE_DOM_PATH = YangInstanceIdentifier.builder()
+        .node(NetworkTopology.QNAME).node(Topology.QNAME)
+        .nodeWithKey(Topology.QNAME, QName.create(Topology.QNAME, "topology-id"), TopologyNetconf.QNAME.getLocalName())
+        .node(Node.QNAME).build();
+    private static final QName NODE_ID_QNAME = QName.create(Node.QNAME, "node-id");
+    private static final String NotificationCapabilityPrefix = "(urn:ietf:params:xml:ns:netconf:notification";
+
+    private final Node node;
+    private final InstanceIdentifier<?> instanceIdent;
+    private final NetconfEventSourceManager netconfEventSourceManager;
+    private ConnectionStatus currentNetconfConnStatus;
+    private EventSourceRegistration<NetconfEventSource> eventSourceRegistration;
+
+    public static NetconfEventSourceRegistration create(final InstanceIdentifier<?> instanceIdent, final Node node,
+        final NetconfEventSourceManager netconfEventSourceManager) {
+        Preconditions.checkNotNull(instanceIdent);
+        Preconditions.checkNotNull(node);
+        Preconditions.checkNotNull(netconfEventSourceManager);
+        if (isEventSource(node) == false) {
+            return null;
+        }
+        NetconfEventSourceRegistration nesr = new NetconfEventSourceRegistration(instanceIdent, node,
+            netconfEventSourceManager);
+        nesr.updateStatus();
+        LOG.debug("NetconfEventSourceRegistration for node {} has been initialized...", node.getNodeId().getValue());
+        return nesr;
+    }
+
+    private static boolean isEventSource(final Node node) {
+        final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
+        if (netconfNode == null) {
+            return false;
+        }
+        if (netconfNode.getAvailableCapabilities() == null) {
+            return false;
+        }
+        final List<String> capabilities = netconfNode.getAvailableCapabilities().getAvailableCapability();
+        if (capabilities == null || capabilities.isEmpty()) {
+            return false;
+        }
+        for (final String capability : netconfNode.getAvailableCapabilities().getAvailableCapability()) {
+            if (capability.startsWith(NotificationCapabilityPrefix)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private NetconfEventSourceRegistration(final InstanceIdentifier<?> instanceIdent, final Node node,
+        final NetconfEventSourceManager netconfEventSourceManager) {
+        this.instanceIdent = instanceIdent;
+        this.node = node;
+        this.netconfEventSourceManager = netconfEventSourceManager;
+        this.eventSourceRegistration = null;
+    }
+
+    public Node getNode() {
+        return node;
+    }
+
+    Optional<EventSourceRegistration<NetconfEventSource>> getEventSourceRegistration() {
+        return Optional.fromNullable(eventSourceRegistration);
+    }
+
+    NetconfNode getNetconfNode() {
+        return node.getAugmentation(NetconfNode.class);
+    }
+
+    void updateStatus() {
+        ConnectionStatus netconfConnStatus = getNetconfNode().getConnectionStatus();
+        LOG.info("Change status on node {}, new status is {}", this.node.getNodeId().getValue(), netconfConnStatus);
+        if (netconfConnStatus.equals(currentNetconfConnStatus)) {
+            return;
+        }
+        changeStatus(netconfConnStatus);
+    }
+
+    private boolean checkConnectionStatusType(ConnectionStatus status) {
+        if (status == ConnectionStatus.Connected || status == ConnectionStatus.Connecting
+            || status == ConnectionStatus.UnableToConnect) {
+            return true;
+        }
+        return false;
+    }
+
+    private void changeStatus(ConnectionStatus newStatus) {
+        Preconditions.checkNotNull(newStatus);
+        if (checkConnectionStatusType(newStatus) == false) {
+            throw new IllegalStateException("Unknown new Netconf Connection Status");
+        }
+        if (this.currentNetconfConnStatus == null) {
+            if (newStatus == ConnectionStatus.Connected) {
+                registrationEventSource();
+            }
+        } else if (this.currentNetconfConnStatus == ConnectionStatus.Connecting) {
+            if (newStatus == ConnectionStatus.Connected) {
+                if (this.eventSourceRegistration == null) {
+                    registrationEventSource();
+                } else {
+                    // reactivate stream on registered event source (invoke publish notification about connection)
+                    this.eventSourceRegistration.getInstance().reActivateStreams();
+                }
+            }
+        } else if (this.currentNetconfConnStatus == ConnectionStatus.Connected) {
+
+            if (newStatus == ConnectionStatus.Connecting || newStatus == ConnectionStatus.UnableToConnect) {
+                // deactivate streams on registered event source (invoke publish notification about connection)
+                this.eventSourceRegistration.getInstance().deActivateStreams();
+            }
+        } else if (this.currentNetconfConnStatus == ConnectionStatus.UnableToConnect) {
+            if (newStatus == ConnectionStatus.Connected) {
+                if (this.eventSourceRegistration == null) {
+                    registrationEventSource();
+                } else {
+                    // reactivate stream on registered event source (invoke publish notification about connection)
+                    this.eventSourceRegistration.getInstance().reActivateStreams();
+                }
+            }
+        } else {
+            throw new IllegalStateException("Unknown current Netconf Connection Status");
+        }
+        this.currentNetconfConnStatus = newStatus;
+    }
+
+    private void registrationEventSource() {
+        final Optional<MountPoint> mountPoint = netconfEventSourceManager.getMountPointService()
+            .getMountPoint(instanceIdent);
+        final Optional<DOMMountPoint> domMountPoint = netconfEventSourceManager.getDomMounts()
+            .getMountPoint(domMountPath(node.getNodeId()));
+        EventSourceRegistration<NetconfEventSource> registration = null;
+        if (domMountPoint.isPresent() && mountPoint.isPresent()) {
+            final NetconfEventSource netconfEventSource = new NetconfEventSource(node,
+                netconfEventSourceManager.getStreamMap(), domMountPoint.get(), mountPoint.get(),
+                netconfEventSourceManager.getPublishService());
+            registration = netconfEventSourceManager.getEventSourceRegistry().registerEventSource(netconfEventSource);
+            LOG.info("Event source {} has been registered", node.getNodeId().getValue());
+        }
+        this.eventSourceRegistration = registration;
+    }
+
+    private YangInstanceIdentifier domMountPath(final NodeId nodeId) {
+        return YangInstanceIdentifier.builder(NETCONF_DEVICE_DOM_PATH)
+            .nodeWithKey(Node.QNAME, NODE_ID_QNAME, nodeId.getValue()).build();
+    }
+
+    private void closeEventSourceRegistration() {
+        if (getEventSourceRegistration().isPresent()) {
+            getEventSourceRegistration().get().close();
+        }
+    }
+
+    @Override public void close() {
+        closeEventSourceRegistration();
+    }
+
+}
@@ -8,18 +8,16 @@
 package org.opendaylight.controller.messagebus.eventsources.netconf;
 
 import java.util.ArrayList;
-
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public abstract class NotificationTopicRegistration implements AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(NotificationTopicRegistration.class);
 
-    public enum NotificationSourceType{
+    public enum NotificationSourceType {
         NetconfDeviceStream,
         ConnectionStatusChange;
     }
@@ -30,7 +28,8 @@ public abstract class NotificationTopicRegistration implements AutoCloseable {
     private final String notificationUrnPrefix;
     private boolean replaySupported;
 
-    protected NotificationTopicRegistration(NotificationSourceType notificationSourceType, String sourceName, String notificationUrnPrefix) {
+    protected NotificationTopicRegistration(NotificationSourceType notificationSourceType, String sourceName,
+        String notificationUrnPrefix) {
         this.notificationSourceType = notificationSourceType;
         this.sourceName = sourceName;
         this.notificationUrnPrefix = notificationUrnPrefix;
@@ -58,14 +57,16 @@ public abstract class NotificationTopicRegistration implements AutoCloseable {
         return notificationUrnPrefix;
     }
 
-    public boolean checkNotificationPath(SchemaPath notificationPath){
-        if(notificationPath == null){
+    public boolean checkNotificationPath(SchemaPath notificationPath) {
+        if (notificationPath == null) {
             return false;
         }
         String nameSpace = notificationPath.getLastComponent().toString();
-        LOG.debug("CheckNotification - name space {} - NotificationUrnPrefix {}", nameSpace, getNotificationUrnPrefix());
+        LOG.debug("CheckNotification - name space {} - NotificationUrnPrefix {}", nameSpace,
+            getNotificationUrnPrefix());
         return nameSpace.startsWith(getNotificationUrnPrefix());
     }
+
     abstract void activateNotificationSource();
 
     abstract void deActivateNotificationSource();
diff --git a/opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java b/opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java
new file mode 100644 (file)
index 0000000..21d4cde
--- /dev/null
@@ -0,0 +1,203 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.eventsources.netconf;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeAttrBuilder;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StreamNotificationTopicRegistration extends NotificationTopicRegistration {
+
+    private static final Logger LOG = LoggerFactory.getLogger(StreamNotificationTopicRegistration.class);
+    private static final NodeIdentifier STREAM_QNAME = new NodeIdentifier(
+        QName.create(CreateSubscriptionInput.QNAME, "stream"));
+    private static final SchemaPath CREATE_SUBSCRIPTION = SchemaPath
+        .create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription"));
+    private static final NodeIdentifier START_TIME_SUBSCRIPTION = new NodeIdentifier(
+        QName.create(CreateSubscriptionInput.QNAME, "startTime"));
+
+    final private DOMMountPoint domMountPoint;
+    final private String nodeId;
+    final private NetconfEventSource netconfEventSource;
+    final private Stream stream;
+    private Date lastEventTime;
+
+    private ConcurrentHashMap<SchemaPath, ListenerRegistration<NetconfEventSource>> notificationRegistrationMap = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<SchemaPath, ArrayList<TopicId>> notificationTopicMap = new ConcurrentHashMap<>();
+
+    public StreamNotificationTopicRegistration(final Stream stream, final String notificationPrefix,
+        NetconfEventSource netconfEventSource) {
+        super(NotificationSourceType.NetconfDeviceStream, stream.getName().getValue(), notificationPrefix);
+        this.domMountPoint = netconfEventSource.getDOMMountPoint();
+        this.nodeId = netconfEventSource.getNode().getNodeId().getValue().toString();
+        this.netconfEventSource = netconfEventSource;
+        this.stream = stream;
+        this.lastEventTime = null;
+        setReplaySupported(this.stream.isReplaySupport());
+        setActive(false);
+        LOG.info("StreamNotificationTopicRegistration initialized for {}", getStreamName());
+    }
+
+    void activateNotificationSource() {
+        if (isActive() == false) {
+            LOG.info("Stream {} is not active on node {}. Will subscribe.", this.getStreamName(), this.nodeId);
+            final ContainerNode input = Builders.containerBuilder()
+                .withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME))
+                .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName())).build();
+            CheckedFuture<DOMRpcResult, DOMRpcException> csFuture = domMountPoint.getService(DOMRpcService.class).get()
+                .invokeRpc(CREATE_SUBSCRIPTION, input);
+            try {
+                csFuture.checkedGet();
+                setActive(true);
+            } catch (DOMRpcException e) {
+                LOG.warn("Can not subscribe stream {} on node {}", this.getSourceName(), this.nodeId);
+                setActive(false);
+                return;
+            }
+        } else {
+            LOG.info("Stream {} is now active on node {}", this.getStreamName(), this.nodeId);
+        }
+    }
+
+    void reActivateNotificationSource() {
+        if (isActive()) {
+            LOG.info("Stream {} is reactivating on node {}.", this.getStreamName(), this.nodeId);
+            DataContainerNodeAttrBuilder<NodeIdentifier, ContainerNode> inputBuilder = Builders.containerBuilder()
+                .withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME))
+                .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName()));
+            if (isReplaySupported() && this.getLastEventTime() != null) {
+                inputBuilder.withChild(ImmutableNodes.leafNode(START_TIME_SUBSCRIPTION, this.getLastEventTime()));
+            }
+            final ContainerNode input = inputBuilder.build();
+            CheckedFuture<DOMRpcResult, DOMRpcException> csFuture = domMountPoint.getService(DOMRpcService.class).get()
+                .invokeRpc(CREATE_SUBSCRIPTION, input);
+            try {
+                csFuture.checkedGet();
+                setActive(true);
+            } catch (DOMRpcException e) {
+                LOG.warn("Can not resubscribe stream {} on node {}", this.getSourceName(), this.nodeId);
+                setActive(false);
+                return;
+            }
+        }
+    }
+
+    @Override void deActivateNotificationSource() {
+        // no operations need
+    }
+
+    private void closeStream() {
+        if (isActive()) {
+            for (ListenerRegistration<NetconfEventSource> reg : notificationRegistrationMap.values()) {
+                reg.close();
+            }
+            notificationRegistrationMap.clear();
+            notificationTopicMap.clear();
+            setActive(false);
+        }
+    }
+
+    private String getStreamName() {
+        return getSourceName();
+    }
+
+    @Override ArrayList<TopicId> getNotificationTopicIds(SchemaPath notificationPath) {
+        return notificationTopicMap.get(notificationPath);
+    }
+
+    @Override boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId) {
+
+        if (checkNotificationPath(notificationPath) == false) {
+            LOG.debug("Bad SchemaPath for notification try to register");
+            return false;
+        }
+
+        final Optional<DOMNotificationService> notifyService = domMountPoint.getService(DOMNotificationService.class);
+        if (notifyService.isPresent() == false) {
+            LOG.debug("DOMNotificationService is not present");
+            return false;
+        }
+
+        activateNotificationSource();
+        if (isActive() == false) {
+            LOG.warn("Stream {} is not active, listener for notification {} is not registered.", getStreamName(),
+                notificationPath.toString());
+            return false;
+        }
+
+        ListenerRegistration<NetconfEventSource> registration = notifyService.get()
+            .registerNotificationListener(this.netconfEventSource, notificationPath);
+        notificationRegistrationMap.put(notificationPath, registration);
+        ArrayList<TopicId> topicIds = getNotificationTopicIds(notificationPath);
+        if (topicIds == null) {
+            topicIds = new ArrayList<>();
+            topicIds.add(topicId);
+        } else {
+            if (topicIds.contains(topicId) == false) {
+                topicIds.add(topicId);
+            }
+        }
+
+        notificationTopicMap.put(notificationPath, topicIds);
+        return true;
+    }
+
+    @Override synchronized void unRegisterNotificationTopic(TopicId topicId) {
+        List<SchemaPath> notificationPathToRemove = new ArrayList<>();
+        for (SchemaPath notifKey : notificationTopicMap.keySet()) {
+            ArrayList<TopicId> topicList = notificationTopicMap.get(notifKey);
+            if (topicList != null) {
+                topicList.remove(topicId);
+                if (topicList.isEmpty()) {
+                    notificationPathToRemove.add(notifKey);
+                }
+            }
+        }
+        for (SchemaPath notifKey : notificationPathToRemove) {
+            notificationTopicMap.remove(notifKey);
+            ListenerRegistration<NetconfEventSource> reg = notificationRegistrationMap.remove(notifKey);
+            if (reg != null) {
+                reg.close();
+            }
+        }
+    }
+
+    Optional<Date> getLastEventTime() {
+        return Optional.fromNullable(lastEventTime);
+    }
+
+    void setLastEventTime(Date lastEventTime) {
+        this.lastEventTime = lastEventTime;
+    }
+
+    @Override public void close() throws Exception {
+        closeStream();
+    }
+
+}
diff --git a/opendaylight/netconf/messagebus-netconf/src/main/resources/initial/06-message-netconf.xml b/opendaylight/netconf/messagebus-netconf/src/main/resources/initial/06-message-netconf.xml
new file mode 100644 (file)
index 0000000..421085a
--- /dev/null
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<snapshot>
+    <configuration>
+      <data xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
+          <modules xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
+              <module>
+                  <name>messagebus-netconf</name>
+                  <type xmlns:binding-impl="urn:opendaylight:params:xml:ns:yang:controller:messagebus:netconf">binding-impl:messagebus-netconf</type>
+                  <dom-broker xmlns="urn:opendaylight:params:xml:ns:yang:controller:messagebus:netconf">
+                      <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:dom-broker-osgi-registry</type>
+                      <name>dom-broker</name>
+                  </dom-broker>
+                  <binding-broker xmlns="urn:opendaylight:params:xml:ns:yang:controller:messagebus:netconf">
+                      <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">prefix:binding-broker-osgi-registry</type>
+                      <name>binding-osgi-broker</name>
+                  </binding-broker>
+                  <event-source-registry xmlns="urn:opendaylight:params:xml:ns:yang:controller:messagebus:netconf">
+                      <type xmlns:mb-esr="urn:opendaylight:params:xml:ns:yang:controller:messagebus:spi:eventsourceregistry">mb-esr:event-source-registry</type>
+                      <name>messagebus-app-impl</name>
+                  </event-source-registry>
+                  <namespace-to-stream xmlns="urn:opendaylight:params:xml:ns:yang:controller:messagebus:netconf">
+                      <urn-prefix>urn:ietf:params:xml:ns:yang:smiv2</urn-prefix>
+                      <stream-name>SNMP</stream-name>
+                  </namespace-to-stream>
+                  <namespace-to-stream xmlns="urn:opendaylight:params:xml:ns:yang:controller:messagebus:netconf">
+                      <urn-prefix>urn:ietf:params:xml:ns:yang:ietf-syslog-notification</urn-prefix>
+                      <stream-name>SYSLOG</stream-name>
+                  </namespace-to-stream>
+              </module>
+          </modules>
+      </data>
+  </configuration>
+  <required-capabilities>
+      <capability>urn:opendaylight:params:xml:ns:yang:controller:messagebus:netconf?module=messagebus-netconf&amp;revision=2015-07-28</capability>
+      <capability>urn:opendaylight:params:xml:ns:yang:controller:messagebus:spi:eventsourceregistry?module=messagebus-event-source-registry&amp;revision=2015-04-02</capability>
+  </required-capabilities>
+</snapshot>
diff --git a/opendaylight/netconf/messagebus-netconf/src/main/yang/messagebus-netconf.yang b/opendaylight/netconf/messagebus-netconf/src/main/yang/messagebus-netconf.yang
new file mode 100644 (file)
index 0000000..780175b
--- /dev/null
@@ -0,0 +1,68 @@
+module messagebus-netconf {
+    yang-version 1;
+    namespace "urn:opendaylight:params:xml:ns:yang:controller:messagebus:netconf";
+    prefix "msgb-netconf";
+
+    import config { prefix config; revision-date 2013-04-05; }
+    import opendaylight-md-sal-binding {prefix sal;}
+    import opendaylight-md-sal-dom {prefix dom;}
+    import messagebus-event-source-registry {prefix esr;}
+
+    description
+        "Message bus netconf event source";
+
+    revision "2015-07-28" {
+        description "Message bus netconf event source initial definition";
+    }
+
+    identity messagebus-netconf {
+        base config:module-type;
+        config:java-name-prefix MessageBusNetconf;
+    }
+
+    augment "/config:modules/config:module/config:configuration" {
+        case messagebus-netconf {
+            when "/config:modules/config:module/config:type = 'messagebus-netconf'";
+
+            container event-source-registry {
+                uses config:service-ref {
+                    refine type {
+                        mandatory true;
+                        config:required-identity esr:event-source-registry;
+                    }
+                }
+            }
+
+            container dom-broker {
+                uses config:service-ref {
+                    refine type {
+                        mandatory true;
+                        config:required-identity dom:dom-broker-osgi-registry;
+                    }
+                }
+            }
+
+            container binding-broker {
+                uses config:service-ref {
+                    refine type {
+                        mandatory true;
+                        config:required-identity sal:binding-broker-osgi-registry;
+                    }
+                }
+            }
+
+            list namespace-to-stream {
+                key urn-prefix;
+
+                leaf urn-prefix {
+                    type string;
+                }
+
+                leaf stream-name {
+                    type string;
+                }
+            }
+
+        }
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/netconf/messagebus-netconf/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManagerTest.java b/opendaylight/netconf/messagebus-netconf/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManagerTest.java
new file mode 100644 (file)
index 0000000..c85bf47
--- /dev/null
@@ -0,0 +1,173 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.eventsources.netconf;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.notNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.controller.config.yang.messagebus.netconf.NamespaceToStream;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.MountPoint;
+import org.opendaylight.controller.md.sal.binding.api.MountPointService;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.messagebus.spi.EventSource;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+public class NetconfEventSourceManagerTest {
+
+    NetconfEventSourceManager netconfEventSourceManager;
+    ListenerRegistration listenerRegistrationMock;
+    DOMMountPointService domMountPointServiceMock;
+    MountPointService mountPointServiceMock;
+    EventSourceRegistry eventSourceTopologyMock;
+    AsyncDataChangeEvent asyncDataChangeEventMock;
+    RpcProviderRegistry rpcProviderRegistryMock;
+    EventSourceRegistry eventSourceRegistry;
+    @BeforeClass
+    public static void initTestClass() throws IllegalAccessException, InstantiationException {
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        DataBroker dataBrokerMock = mock(DataBroker.class);
+        DOMNotificationPublishService domNotificationPublishServiceMock = mock(DOMNotificationPublishService.class);
+        domMountPointServiceMock = mock(DOMMountPointService.class);
+        mountPointServiceMock = mock(MountPointService.class);
+        eventSourceTopologyMock = mock(EventSourceRegistry.class);
+        rpcProviderRegistryMock = mock(RpcProviderRegistry.class);
+        eventSourceRegistry = mock(EventSourceRegistry.class);
+        List<NamespaceToStream> namespaceToStreamList = new ArrayList<>();
+
+        listenerRegistrationMock = mock(ListenerRegistration.class);
+        doReturn(listenerRegistrationMock).when(dataBrokerMock).registerDataChangeListener(eq(LogicalDatastoreType.OPERATIONAL), any(InstanceIdentifier.class), any(NetconfEventSourceManager.class), eq(
+            AsyncDataBroker.DataChangeScope.SUBTREE));
+
+        Optional<DOMMountPoint> optionalDomMountServiceMock = (Optional<DOMMountPoint>) mock(Optional.class);
+        doReturn(true).when(optionalDomMountServiceMock).isPresent();
+        doReturn(optionalDomMountServiceMock).when(domMountPointServiceMock).getMountPoint((YangInstanceIdentifier)notNull());
+
+        DOMMountPoint domMountPointMock = mock(DOMMountPoint.class);
+        doReturn(domMountPointMock).when(optionalDomMountServiceMock).get();
+
+
+        Optional optionalBindingMountMock = mock(Optional.class);
+        doReturn(true).when(optionalBindingMountMock).isPresent();
+
+        MountPoint mountPointMock = mock(MountPoint.class);
+        doReturn(optionalBindingMountMock).when(mountPointServiceMock).getMountPoint(any(InstanceIdentifier.class));
+        doReturn(mountPointMock).when(optionalBindingMountMock).get();
+
+        Optional optionalMpDataBroker = mock(Optional.class);
+        DataBroker mpDataBroker = mock(DataBroker.class);
+        doReturn(optionalMpDataBroker).when(mountPointMock).getService(DataBroker.class);
+        doReturn(true).when(optionalMpDataBroker).isPresent();
+        doReturn(mpDataBroker).when(optionalMpDataBroker).get();
+
+        ReadOnlyTransaction rtx = mock(ReadOnlyTransaction.class);
+        doReturn(rtx).when(mpDataBroker).newReadOnlyTransaction();
+        CheckedFuture<Optional<Streams>, ReadFailedException> checkFeature = (CheckedFuture<Optional<Streams>, ReadFailedException>)mock(CheckedFuture.class);
+        InstanceIdentifier<Streams> pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build();
+        doReturn(checkFeature).when(rtx).read(LogicalDatastoreType.OPERATIONAL, pathStream);
+        Optional<Streams> avStreams = NetconfTestUtils.getAvailableStream("stream01", true);
+        doReturn(avStreams).when(checkFeature).checkedGet();
+
+        EventSourceRegistration esrMock = mock(EventSourceRegistration.class);
+
+        netconfEventSourceManager =
+                NetconfEventSourceManager
+                    .create(dataBrokerMock, domNotificationPublishServiceMock, domMountPointServiceMock,
+                        mountPointServiceMock, eventSourceRegistry, namespaceToStreamList);
+    }
+
+    @Test
+    public void onDataChangedCreateEventSourceTestByCreateEntry() throws Exception {
+        onDataChangedTestHelper(true,false,true, NetconfTestUtils.notification_capability_prefix);
+        netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
+        verify(eventSourceRegistry, times(1)).registerEventSource(any(EventSource.class));
+    }
+
+    @Test
+    public void onDataChangedCreateEventSourceTestByUpdateEntry() throws Exception {
+        onDataChangedTestHelper(false,true,true, NetconfTestUtils.notification_capability_prefix);
+        netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
+        verify(eventSourceRegistry, times(1)).registerEventSource(any(EventSource.class));
+    }
+
+    @Test
+    public void onDataChangedCreateEventSourceTestNotNeconf() throws Exception {
+        onDataChangedTestHelper(false,true,false, NetconfTestUtils.notification_capability_prefix);
+        netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
+        verify(eventSourceRegistry, times(0)).registerEventSource(any(EventSource.class));
+    }
+
+    @Test
+    public void onDataChangedCreateEventSourceTestNotNotificationCapability() throws Exception {
+        onDataChangedTestHelper(true,false,true,"bad-prefix");
+        netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
+        verify(eventSourceRegistry, times(0)).registerEventSource(any(EventSource.class));
+    }
+
+    private void onDataChangedTestHelper(boolean create, boolean update, boolean isNetconf, String notificationCapabilityPrefix) throws Exception{
+        asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class);
+        Map<InstanceIdentifier, DataObject> mapCreate = new HashMap<>();
+        Map<InstanceIdentifier, DataObject> mapUpdate = new HashMap<>();
+
+        Node node01;
+        String nodeId = "Node01";
+        doReturn(mapCreate).when(asyncDataChangeEventMock).getCreatedData();
+        doReturn(mapUpdate).when(asyncDataChangeEventMock).getUpdatedData();
+
+        if(isNetconf){
+            node01 = NetconfTestUtils
+                .getNetconfNode(nodeId, "node01.test.local", ConnectionStatus.Connected, notificationCapabilityPrefix);
+
+        } else {
+            node01 = NetconfTestUtils.getNode(nodeId);
+        }
+
+        if(create){
+            mapCreate.put(NetconfTestUtils.getInstanceIdentifier(node01), node01);
+        }
+        if(update){
+            mapUpdate.put(NetconfTestUtils.getInstanceIdentifier(node01), node01);
+        }
+
+    }
+
+}
\ No newline at end of file
diff --git a/opendaylight/netconf/messagebus-netconf/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceTest.java b/opendaylight/netconf/messagebus-netconf/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceTest.java
new file mode 100644 (file)
index 0000000..ed548ca
--- /dev/null
@@ -0,0 +1,139 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.eventsources.netconf;
+
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.binding.api.BindingService;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.MountPoint;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.md.sal.dom.api.DOMService;
+import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+public class NetconfEventSourceTest {
+
+    NetconfEventSource netconfEventSource;
+    DOMMountPoint domMountPointMock;
+    MountPoint mountPointMock;
+    JoinTopicInput joinTopicInputMock;
+
+    @Before
+    public void setUp() throws Exception {
+        Map<String, String> streamMap = new HashMap<>();
+        streamMap.put("uriStr1", "string2");
+        domMountPointMock = mock(DOMMountPoint.class);
+        mountPointMock = mock(MountPoint.class);
+        DOMNotificationPublishService domNotificationPublishServiceMock = mock(DOMNotificationPublishService.class);
+        RpcConsumerRegistry rpcConsumerRegistryMock = mock(RpcConsumerRegistry.class);
+        Optional<BindingService> onlyOptionalMock = (Optional<BindingService>) mock(Optional.class);
+        NotificationsService notificationsServiceMock = mock(NotificationsService.class);
+        doReturn(notificationsServiceMock).when(rpcConsumerRegistryMock).getRpcService(NotificationsService.class);
+
+        Optional<DataBroker> optionalMpDataBroker = (Optional<DataBroker>) mock(Optional.class);
+        DataBroker mpDataBroker = mock(DataBroker.class);
+        doReturn(optionalMpDataBroker).when(mountPointMock).getService(DataBroker.class);
+        doReturn(true).when(optionalMpDataBroker).isPresent();
+        doReturn(mpDataBroker).when(optionalMpDataBroker).get();
+
+        ReadOnlyTransaction rtx = mock(ReadOnlyTransaction.class);
+        doReturn(rtx).when(mpDataBroker).newReadOnlyTransaction();
+        CheckedFuture<Optional<Streams>, ReadFailedException> checkFeature = (CheckedFuture<Optional<Streams>, ReadFailedException>)mock(CheckedFuture.class);
+        InstanceIdentifier<Streams> pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build();
+        doReturn(checkFeature).when(rtx).read(LogicalDatastoreType.OPERATIONAL, pathStream);
+        Optional<Streams> avStreams = NetconfTestUtils.getAvailableStream("stream01", true);
+        doReturn(avStreams).when(checkFeature).checkedGet();
+
+        netconfEventSource = new NetconfEventSource(
+                NetconfTestUtils.getNetconfNode("NodeId1", "node.test.local", ConnectionStatus.Connected,
+                    NetconfTestUtils.notification_capability_prefix),
+                streamMap,
+                domMountPointMock,
+                mountPointMock ,
+                domNotificationPublishServiceMock);
+
+    }
+
+    @Test
+    public void joinTopicTest() throws Exception{
+        joinTopicTestHelper();
+        assertNotNull("JoinTopic return value has not been created correctly.", netconfEventSource.joinTopic(joinTopicInputMock));
+    }
+
+    private void joinTopicTestHelper() throws Exception{
+        joinTopicInputMock = mock(JoinTopicInput.class);
+        TopicId topicId = new TopicId("topicID007");
+        doReturn(topicId).when(joinTopicInputMock).getTopicId();
+        NotificationPattern notificationPatternMock = mock(NotificationPattern.class);
+        doReturn(notificationPatternMock).when(joinTopicInputMock).getNotificationPattern();
+        doReturn("uriStr1").when(notificationPatternMock).getValue();
+
+        SchemaContext schemaContextMock = mock(SchemaContext.class);
+        doReturn(schemaContextMock).when(domMountPointMock).getSchemaContext();
+        Set<NotificationDefinition> notificationDefinitionSet = new HashSet<>();
+        NotificationDefinition notificationDefinitionMock = mock(NotificationDefinition.class);
+        notificationDefinitionSet.add(notificationDefinitionMock);
+
+        URI uri = new URI("uriStr1");
+        QName qName = new QName(uri, "localName1");
+        org.opendaylight.yangtools.yang.model.api.SchemaPath schemaPath = SchemaPath.create(true, qName);
+        doReturn(notificationDefinitionSet).when(schemaContextMock).getNotifications();
+        doReturn(schemaPath).when(notificationDefinitionMock).getPath();
+
+        Optional<DOMNotificationService> domNotificationServiceOptionalMock = (Optional<DOMNotificationService>) mock(Optional.class);
+        doReturn(domNotificationServiceOptionalMock).when(domMountPointMock).getService(DOMNotificationService.class);
+        doReturn(true).when(domNotificationServiceOptionalMock).isPresent();
+
+        DOMNotificationService domNotificationServiceMock = mock(DOMNotificationService.class);
+        doReturn(domNotificationServiceMock).when(domNotificationServiceOptionalMock).get();
+        ListenerRegistration<NetconfEventSource> listenerRegistrationMock = (ListenerRegistration<NetconfEventSource>)mock(ListenerRegistration.class);
+        doReturn(listenerRegistrationMock).when(domNotificationServiceMock).registerNotificationListener(any(NetconfEventSource.class), any(SchemaPath.class));
+
+        Optional<DOMService> optionalMock = (Optional<DOMService>) mock(Optional.class);
+        doReturn(optionalMock).when(domMountPointMock).getService(DOMRpcService.class);
+        doReturn(true).when(optionalMock).isPresent();
+        DOMRpcService domRpcServiceMock = mock(DOMRpcService.class);
+        doReturn(domRpcServiceMock).when(optionalMock).get();
+        CheckedFuture checkedFutureMock = mock(CheckedFuture.class);
+        doReturn(checkedFutureMock).when(domRpcServiceMock).invokeRpc(any(SchemaPath.class), any(ContainerNode.class));
+
+    }
+
+}
\ No newline at end of file
diff --git a/opendaylight/netconf/messagebus-netconf/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfTestUtils.java b/opendaylight/netconf/messagebus-netconf/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfTestUtils.java
new file mode 100644 (file)
index 0000000..3261ddb
--- /dev/null
@@ -0,0 +1,87 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.eventsources.netconf;
+
+import com.google.common.base.Optional;
+import java.util.ArrayList;
+import java.util.List;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.StreamsBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.DomainName;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.AvailableCapabilities;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.AvailableCapabilitiesBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+public final class NetconfTestUtils {
+
+    public static final String notification_capability_prefix = "(urn:ietf:params:xml:ns:netconf:notification";
+
+    private NetconfTestUtils() {
+    }
+
+    public static Node getNetconfNode(String nodeIdent, String hostName, ConnectionStatus cs,
+        String notificationCapabilityPrefix) {
+
+        DomainName dn = new DomainName(hostName);
+        Host host = new Host(dn);
+
+        List<String> avCapList = new ArrayList<>();
+        avCapList.add(notificationCapabilityPrefix + "_availableCapabilityString1");
+        AvailableCapabilities avCaps = new AvailableCapabilitiesBuilder().setAvailableCapability(avCapList).build();
+        NetconfNode nn = new NetconfNodeBuilder().setConnectionStatus(cs).setHost(host).setAvailableCapabilities(avCaps)
+            .build();
+
+        NodeId nodeId = new NodeId(nodeIdent);
+        NodeKey nk = new NodeKey(nodeId);
+        NodeBuilder nb = new NodeBuilder();
+        nb.setKey(nk);
+
+        nb.addAugmentation(NetconfNode.class, nn);
+        return nb.build();
+    }
+
+    public static Node getNode(String nodeIdent) {
+        NodeId nodeId = new NodeId(nodeIdent);
+        NodeKey nk = new NodeKey(nodeId);
+        NodeBuilder nb = new NodeBuilder();
+        nb.setKey(nk);
+        return nb.build();
+    }
+
+    public static InstanceIdentifier<Node> getInstanceIdentifier(Node node) {
+        TopologyKey NETCONF_TOPOLOGY_KEY = new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName()));
+        InstanceIdentifier<Node> nodeII = InstanceIdentifier.create(NetworkTopology.class)
+            .child(Topology.class, NETCONF_TOPOLOGY_KEY).child(Node.class, node.getKey());
+        return nodeII;
+    }
+
+    public static Optional<Streams> getAvailableStream(String Name, boolean replaySupport) {
+        Stream stream = new StreamBuilder().setName(new StreamNameType(Name)).setReplaySupport(replaySupport).build();
+        List<Stream> streamList = new ArrayList<>();
+        streamList.add(stream);
+        Streams streams = new StreamsBuilder().setStream(streamList).build();
+        return Optional.of(streams);
+    }
+
+}
index 269dd4d..4607bef 100644 (file)
                 <artifactId>sal-netconf-connector</artifactId>
                 <version>${mdsal.version}</version>
             </dependency>
+            <dependency>
+                <groupId>${project.groupId}</groupId>
+                <artifactId>messagebus-netconf</artifactId>
+                <version>${project.version}</version>
+            </dependency>
             <dependency>
                 <groupId>${project.groupId}</groupId>
                 <artifactId>features-netconf-connector</artifactId>
index a98f632..f33eb11 100644 (file)
@@ -35,6 +35,7 @@
     <module>netconf-notifications-impl</module>
     <module>netconf-notifications-api</module>
     <module>sal-netconf-connector</module>
+    <module>messagebus-netconf</module>
     <module>features</module>
     <module>models</module>
     <module>tools</module>

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.