BUG 2799: Migration of Message Bus from deprecated Helium MD-SAL APIs 40/16140/13
authorIgor Bartak <ibartak@cisco.com>
Wed, 18 Mar 2015 17:00:13 +0000 (18:00 +0100)
committerIgor Bartak <ibartak@cisco.com>
Wed, 18 Mar 2015 17:00:13 +0000 (18:00 +0100)
to Lithium API (copyrights corrected)

Change-Id: I2206b4b532e4feead26c166b793966b077f0f26f
Signed-off-by: Igor Bartak <ibartak@cisco.com>
31 files changed:
features/mdsal/pom.xml
features/mdsal/src/main/resources/features.xml
features/netconf-connector/pom.xml
features/netconf-connector/src/main/resources/features.xml
opendaylight/md-sal/mdsal-artifacts/pom.xml
opendaylight/md-sal/messagebus-api/pom.xml
opendaylight/md-sal/messagebus-config/pom.xml [new file with mode: 0644]
opendaylight/md-sal/messagebus-config/src/main/resources/initial/05-message-bus.xml [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/pom.xml
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModule.java
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/Providers.java [moved from opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/Providers.java with 62% similarity]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/DataStore.java [deleted file]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/InitializationContext.java [deleted file]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/MdSAL.java [deleted file]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventAggregator.java [deleted file]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceManager.java [deleted file]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopic.java [moved from opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Topic.java with 67% similarity]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSource.java
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceManager.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/TopicDOMNotification.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Util.java
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleFactoryTest.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleTest.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopicTest.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopologyTest.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceManagerTest.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceTest.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/TopicDOMNotificationTest.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/UtilTest.java [new file with mode: 0644]
opendaylight/md-sal/pom.xml

index 35d5dc2..9222e13 100644 (file)
       <version>${mdsal.version}</version>
     </dependency>
 
+
     <!-- toaster -->
     <dependency>
       <groupId>org.opendaylight.controller.samples</groupId>
           </execution>
         </executions>
       </plugin>
-      <plugin>
-        <groupId>org.opendaylight.yangtools</groupId>
-        <artifactId>yang-maven-plugin</artifactId>
-        <version>${yangtools.version}</version>
-        <dependencies>
-          <dependency>
-            <groupId>org.opendaylight.yangtools</groupId>
-            <artifactId>maven-sal-api-gen-plugin</artifactId>
-            <version>${yangtools.version}</version>
-            <type>jar</type>
-          </dependency>
-          <dependency>
-            <groupId>org.opendaylight.yangtools</groupId>
-            <artifactId>yang-binding</artifactId>
-            <version>${yangtools.version}</version>
-            <type>jar</type>
-          </dependency>
-          <dependency>
-            <groupId>org.opendaylight.controller</groupId>
-            <artifactId>sal-rest-docgen-maven</artifactId>
-            <version>${mdsal.version}</version>
-            <type>jar</type>
-          </dependency>
-        </dependencies>
-        <executions>
-          <execution>
-            <goals>
-              <goal>generate-sources</goal>
-            </goals>
-            <configuration>
-              <yangFilesRootDir>src</yangFilesRootDir>
-              <codeGenerators>
-                <generator>
-                  <codeGeneratorClass>org.opendaylight.controller.sal.rest.doc.maven.StaticDocGenerator</codeGeneratorClass>
-                  <outputBaseDir>${project.build.directory}/generated-resources/swagger-api-documentation/explorer/static</outputBaseDir>
-                </generator>
-              </codeGenerators>
-              <inspectDependencies>true</inspectDependencies>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
     </plugins>
   </build>
   <scm>
index bec365c..fb368ab 100644 (file)
         <bundle>mvn:org.opendaylight.controller.samples/clustering-it-provider/${project.version}</bundle>
         <configfile finalname="${config.configfile.directory}/20-clustering-test-app.xml">mvn:org.opendaylight.controller.samples/clustering-it-config/${project.version}/xml/config</configfile>
     </feature>
-
 </features>
index 9cb2b1e..b98f839 100644 (file)
       <groupId>org.bouncycastle</groupId>
       <artifactId>bcprov-jdk15on</artifactId>
     </dependency>
+
+
+     <!-- message-bus -->
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>messagebus-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>messagebus-impl</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>messagebus-config</artifactId>
+      <version>${mdsal.version}</version>
+      <type>xml</type>
+      <classifier>config</classifier>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>netconf-connector-config</artifactId>
index 7cabbb4..24a92bd 100644 (file)
         <bundle>mvn:org.opendaylight.controller/netconf-tcp/${netconf.version}</bundle>
       </feature>
 
+    <feature name='odl-message-bus' version='${project.version}'>
+        <feature version='${project.version}'>odl-netconf-connector</feature>
+        <feature version='${project.version}'>odl-mdsal-broker</feature>
+        <bundle>mvn:org.opendaylight.controller/messagebus-api/${project.version}</bundle>
+        <bundle>mvn:org.opendaylight.controller/messagebus-impl/${project.version}</bundle>
+        <configfile finalname="${config.configfile.directory}/05-message-bus.xml">mvn:org.opendaylight.controller/messagebus-config/${project.version}/xml/config</configfile>
+    </feature>
     <!-- Optional TODO: Remove TODO Comments -->
 
 </features>
index 420f888..d9f0f88 100644 (file)
             <!-- MessageBus -->
             <dependency>
                 <groupId>org.opendaylight.controller</groupId>
-                <artifactId>message-bus-api</artifactId>
+                <artifactId>messagebus-api</artifactId>
                 <version>${project.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.opendaylight.controller</groupId>
-                <artifactId>message-bus-impl</artifactId>
+                <artifactId>messagebus-impl</artifactId>
                 <version>${project.version}</version>
             </dependency>
 
index 542308a..dda7af4 100644 (file)
@@ -17,7 +17,7 @@ and is available at http://www.eclipse.org/legal/epl-v10.html
         <version>1.2.0-SNAPSHOT</version>
     </parent>
 
-    <artifactId>message-bus-api</artifactId>
+    <artifactId>messagebus-api</artifactId>
     <name>${project.artifactId}</name>
 
     <packaging>bundle</packaging>
diff --git a/opendaylight/md-sal/messagebus-config/pom.xml b/opendaylight/md-sal/messagebus-config/pom.xml
new file mode 100644 (file)
index 0000000..262a53e
--- /dev/null
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>\r
+<!--\r
+Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.\r
+\r
+This program and the accompanying materials are made available under the\r
+terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
+and is available at http://www.eclipse.org/legal/epl-v10.html\r
+-->\r
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">\r
+\r
+  <modelVersion>4.0.0</modelVersion>\r
+\r
+  <parent>\r
+    <groupId>org.opendaylight.controller</groupId>\r
+    <artifactId>sal-parent</artifactId>\r
+    <version>1.2.0-SNAPSHOT</version>\r
+  </parent>\r
+\r
+  <artifactId>messagebus-config</artifactId>\r
+  <packaging>jar</packaging>\r
+  <description>Configuration files for message-bus</description>\r
+\r
+  <dependencies>\r
+    <dependency>\r
+      <groupId>org.opendaylight.controller</groupId>\r
+      <artifactId>config-api</artifactId>\r
+    </dependency>\r
+    <dependency>\r
+      <groupId>org.opendaylight.controller</groupId>\r
+      <artifactId>messagebus-api</artifactId>\r
+      <version>${project.version}</version>\r
+    </dependency>\r
+    <dependency>\r
+      <groupId>org.opendaylight.controller</groupId>\r
+      <artifactId>messagebus-impl</artifactId>\r
+      <version>${project.version}</version>\r
+    </dependency>\r
+  </dependencies>\r
+\r
+  <build>\r
+    <plugins>\r
+        <plugin>\r
+        <groupId>org.codehaus.mojo</groupId>\r
+        <artifactId>build-helper-maven-plugin</artifactId>\r
+        <executions>\r
+          <execution>\r
+            <id>attach-artifacts</id>\r
+            <goals>\r
+              <goal>attach-artifact</goal>\r
+            </goals>\r
+            <phase>package</phase>\r
+            <configuration>\r
+              <artifacts>\r
+                <artifact>\r
+                  <file>${project.build.directory}/classes/initial/05-message-bus.xml</file>\r
+                  <type>xml</type>\r
+                  <classifier>config</classifier>\r
+                </artifact>\r
+              </artifacts>\r
+            </configuration>\r
+          </execution>\r
+        </executions>\r
+      </plugin>\r
+    </plugins>\r
+  </build>\r
+\r
+  <scm>\r
+      <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>\r
+      <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>\r
+      <tag>HEAD</tag>\r
+      <url>https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=summary</url>\r
+   </scm>\r
+</project>\r
diff --git a/opendaylight/md-sal/messagebus-config/src/main/resources/initial/05-message-bus.xml b/opendaylight/md-sal/messagebus-config/src/main/resources/initial/05-message-bus.xml
new file mode 100644 (file)
index 0000000..eed06cf
--- /dev/null
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<snapshot>
+      <configuration>
+      <data xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
+          <modules xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
+              <module>
+                  <name>messagebus-app</name>
+                  <type xmlns:binding-impl="urn:opendaylight:params:xml:ns:yang:controller:messagebus:app:impl">binding-impl:messagebus-app-impl</type>
+                  <binding-broker xmlns="urn:opendaylight:params:xml:ns:yang:controller:messagebus:app:impl">
+                      <type xmlns:md-sal-binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">md-sal-binding:binding-broker-osgi-registry</type>
+                      <name>binding-osgi-broker</name>
+                  </binding-broker>
+                  <dom-broker xmlns="urn:opendaylight:params:xml:ns:yang:controller:messagebus:app:impl">
+                      <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:dom-broker-osgi-registry</type>
+                      <name>dom-broker</name>
+                  </dom-broker>
+                  <namespace-to-stream>
+                      <urn-prefix>urn:ietf:params:xml:ns:yang:smiv2</urn-prefix>
+                      <stream-name>SNMP</stream-name>
+                  </namespace-to-stream>
+                  <namespace-to-stream>
+                      <urn-prefix>urn:ietf:params:xml:ns:yang:ietf-syslog-notification</urn-prefix>
+                      <stream-name>SYSLOG</stream-name>
+                  </namespace-to-stream>
+              </module>
+          </modules>
+      </data>
+  </configuration>
+  <required-capabilities>
+      <capability>urn:opendaylight:params:xml:ns:yang:controller:messagebus:app:impl?module=messagebus-app-impl&amp;revision=2015-02-03</capability>
+  </required-capabilities>
+</snapshot>
index ccb7219..7e3b599 100644 (file)
@@ -1,4 +1,11 @@
 <?xml version="1.0" encoding="UTF-8"?>\r
+<!--\r
+Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.\r
+\r
+This program and the accompanying materials are made available under the\r
+terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
+and is available at http://www.eclipse.org/legal/epl-v10.html\r
+-->\r
 <project xmlns="http://maven.apache.org/POM/4.0.0"\r
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"\r
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">\r
@@ -10,7 +17,7 @@
         <version>1.2.0-SNAPSHOT</version>\r
     </parent>\r
 \r
-    <artifactId>message-bus-impl</artifactId>\r
+    <artifactId>messagebus-impl</artifactId>\r
     <name>${project.artifactId}</name>\r
 \r
     <packaging>bundle</packaging>\r
             <groupId>org.opendaylight.controller</groupId>\r
             <artifactId>sal-binding-api</artifactId>\r
         </dependency>\r
+        <dependency>\r
+            <groupId>org.opendaylight.controller</groupId>\r
+            <artifactId>sal-netconf-connector</artifactId>\r
+        </dependency>\r
         <dependency>\r
             <groupId>org.opendaylight.controller</groupId>\r
             <artifactId>sal-core-api</artifactId>\r
         </dependency>\r
         <dependency>\r
             <groupId>org.opendaylight.controller</groupId>\r
-            <artifactId>message-bus-api</artifactId>\r
+            <artifactId>messagebus-api</artifactId>\r
+            <version>1.2.0-SNAPSHOT</version>\r
+        </dependency>\r
+        <dependency>\r
+            <groupId>org.opendaylight.controller</groupId>\r
+            <artifactId>sal-netconf-connector</artifactId>\r
         </dependency>\r
         <dependency>\r
             <groupId>org.opendaylight.controller</groupId>\r
             <artifactId>sal-binding-config</artifactId>\r
         </dependency>\r
+\r
+        <!-- Testing Dependencies -->\r
+        <dependency>\r
+              <groupId>junit</groupId>\r
+              <artifactId>junit</artifactId>\r
+              <scope>test</scope>\r
+        </dependency>\r
+        <dependency>\r
+              <groupId>org.glassfish.jersey.test-framework.providers</groupId>\r
+              <artifactId>jersey-test-framework-provider-grizzly2</artifactId>\r
+              <scope>test</scope>\r
+        </dependency>\r
+        <dependency>\r
+              <groupId>org.mockito</groupId>\r
+              <artifactId>mockito-all</artifactId>\r
+              <scope>test</scope>\r
+        </dependency>\r
     </dependencies>\r
 \r
     <build>\r
index 1c2b78a..022292a 100644 (file)
@@ -1,5 +1,5 @@
 /**
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -7,17 +7,24 @@
  */
 package org.opendaylight.controller.config.yang.messagebus.app.impl;
 
+import java.util.List;
 import org.opendaylight.controller.config.api.DependencyResolver;
 import org.opendaylight.controller.config.api.ModuleIdentifier;
-import org.opendaylight.controller.mdsal.InitializationContext;
-import org.opendaylight.controller.mdsal.Providers;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.MountPointService;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.messagebus.app.impl.EventSourceTopology;
+import org.opendaylight.controller.messagebus.app.impl.NetconfEventSourceManager;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
 import org.osgi.framework.BundleContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
-
-public class MessageBusAppImplModule extends org.opendaylight.controller.config.yang.messagebus.app.impl.AbstractMessageBusAppImplModule {
+public class MessageBusAppImplModule extends
+        org.opendaylight.controller.config.yang.messagebus.app.impl.AbstractMessageBusAppImplModule {
     private static final Logger LOGGER = LoggerFactory.getLogger(MessageBusAppImplModule.class);
 
     private BundleContext bundleContext;
@@ -26,49 +33,55 @@ public class MessageBusAppImplModule extends org.opendaylight.controller.config.
         return bundleContext;
     }
 
-    public void setBundleContext(BundleContext bundleContext) {
+    public void setBundleContext(final BundleContext bundleContext) {
         this.bundleContext = bundleContext;
     }
 
-    public MessageBusAppImplModule( ModuleIdentifier identifier, DependencyResolver dependencyResolver) {
+    public MessageBusAppImplModule(final ModuleIdentifier identifier, final DependencyResolver dependencyResolver) {
         super(identifier, dependencyResolver);
     }
 
-    public MessageBusAppImplModule( ModuleIdentifier identifier,
-                                    DependencyResolver dependencyResolver,
-                                    MessageBusAppImplModule oldModule,
-                                    java.lang.AutoCloseable oldInstance) {
+    public MessageBusAppImplModule(final ModuleIdentifier identifier, final DependencyResolver dependencyResolver,
+            final MessageBusAppImplModule oldModule, final java.lang.AutoCloseable oldInstance) {
         super(identifier, dependencyResolver, oldModule, oldInstance);
     }
 
     @Override
-    protected void customValidation() {}
+    protected void customValidation() {
+    }
 
     @Override
     public java.lang.AutoCloseable createInstance() {
-        List<NamespaceToStream> namespaceMapping = getNamespaceToStream();
-        InitializationContext ic = new InitializationContext(namespaceMapping);
+        final List<NamespaceToStream> namespaceMapping = getNamespaceToStream();
+
+        final ProviderContext bindingCtx = getBindingBrokerDependency().registerProvider(new Providers.BindingAware());
+        final ProviderSession domCtx = getDomBrokerDependency().registerProvider(new Providers.BindingIndependent());
 
-        final Providers.BindingAware bap = new Providers.BindingAware(ic);
-        final Providers.BindingIndependent bip = new Providers.BindingIndependent(ic);
+        final DataBroker dataBroker = bindingCtx.getSALService(DataBroker.class);
+        final DOMNotificationPublishService domPublish = domCtx.getService(DOMNotificationPublishService.class);
+        final DOMMountPointService domMount = domCtx.getService(DOMMountPointService.class);
+        final MountPointService bindingMount = bindingCtx.getSALService(MountPointService.class);
+        final RpcProviderRegistry rpcRegistry = bindingCtx.getSALService(RpcProviderRegistry.class);
 
-        getBindingBrokerDependency().registerProvider(bap, getBundleContext());
-        getDomBrokerDependency().registerProvider(bip);
+        final EventSourceTopology eventSourceTopology = new EventSourceTopology(dataBroker, rpcRegistry);
+        final NetconfEventSourceManager eventSourceManager = new NetconfEventSourceManager(dataBroker, domPublish,
+                domMount, bindingMount, eventSourceTopology, getNamespaceToStream());
 
-        AutoCloseable closer = new AutoCloseable() {
-            @Override public void close()  {
-                closeProvider(bap);
-                closeProvider(bip);
+        final AutoCloseable closer = new AutoCloseable() {
+            @Override
+            public void close() {
+                eventSourceTopology.close();
+                eventSourceManager.close();
             }
         };
 
         return closer;
     }
 
-    private void closeProvider(AutoCloseable closable) {
+    private void closeProvider(final AutoCloseable closable) {
         try {
             closable.close();
-        } catch (Exception e) {
+        } catch (final Exception e) {
             LOGGER.error("Exception while closing: {}\n Exception: {}", closable, e);
         }
     }
@@ -1,12 +1,12 @@
 /**
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
 
-package org.opendaylight.controller.mdsal;
+package org.opendaylight.controller.config.yang.messagebus.app.impl;
 
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
 import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
@@ -19,16 +19,10 @@ public class Providers {
     private static final Logger LOGGER = LoggerFactory.getLogger(Providers.class);
 
     public static class BindingAware implements BindingAwareProvider, AutoCloseable {
-        private final InitializationContext initializationContext;
 
-        public BindingAware(InitializationContext ic) {
-            this.initializationContext = ic;
-        }
 
         @Override
-        public void onSessionInitiated(BindingAwareBroker.ProviderContext session) {
-            initializationContext.set(session);
-
+        public void onSessionInitiated(final BindingAwareBroker.ProviderContext session) {
             LOGGER.info("BindingAwareBroker.ProviderContext initialized");
         }
 
@@ -37,16 +31,9 @@ public class Providers {
     }
 
     public static class BindingIndependent extends AbstractProvider implements AutoCloseable {
-        private final InitializationContext initializationContext;
-
-        public BindingIndependent(InitializationContext ic) {
-            this.initializationContext = ic;
-        }
 
         @Override
-        public void onSessionInitiated(Broker.ProviderSession session) {
-            initializationContext.set(session);
-
+        public void onSessionInitiated(final Broker.ProviderSession session) {
             LOGGER.info("Broker.ProviderSession initialized");
         }
 
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/DataStore.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/DataStore.java
deleted file mode 100644 (file)
index a881fac..0000000
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.mdsal;
-
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
-import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-
-public class DataStore {
-    private static final FutureCallback<Void> DEFAULT_CALLBACK =
-            new FutureCallback<Void>() {
-                public void onSuccess(Void result) {
-                    // TODO: Implement default behaviour
-                }
-
-                public void onFailure(Throwable t) {
-                    // TODO: Implement default behaviour
-                };
-            };
-
-    private final MdSAL mdSAL;
-
-    public DataStore(MdSAL mdSAL) {
-        this.mdSAL = mdSAL;
-    }
-
-    public ListenerRegistration<DataChangeListener> registerDataChangeListener(LogicalDatastoreType store,
-                                                                               InstanceIdentifier<?> path,
-                                                                               DataChangeListener listener,
-                                                                               AsyncDataBroker.DataChangeScope triggeringScope) {
-        return mdSAL.getDataBroker().registerDataChangeListener(store, path, listener, triggeringScope);
-    }
-
-    public <T extends DataObject> void asyncPUT(LogicalDatastoreType datastoreType,
-                                                InstanceIdentifier<T> path,
-                                                T data) {
-        asyncPUT(datastoreType, path, data, DEFAULT_CALLBACK);
-    }
-
-    public <T extends DataObject> void asyncPUT(LogicalDatastoreType datastoreType,
-                                                InstanceIdentifier<T> path,
-                                                T data,
-                                                FutureCallback<Void> callback) {
-        WriteTransaction tx = mdSAL.getDataBroker().newWriteOnlyTransaction();
-        tx.put(datastoreType, path, data, true);
-        execPut(tx, callback);
-    }
-
-    public <T extends DataObject> T read(LogicalDatastoreType datastoreType,
-                                         InstanceIdentifier<T> path) {
-
-        ReadOnlyTransaction tx = mdSAL.getDataBroker().newReadOnlyTransaction();
-        T result = null;
-
-        try {
-            result = tx.read(datastoreType, path).get().get();
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-
-        return result;
-    }
-
-    private static void execPut(WriteTransaction tx, FutureCallback<Void> callback) {
-        Futures.addCallback(tx.submit(), callback);
-    }
-}
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/InitializationContext.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/InitializationContext.java
deleted file mode 100644 (file)
index c73fb2a..0000000
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.mdsal;
-
-import org.opendaylight.controller.config.yang.messagebus.app.impl.NamespaceToStream;
-import org.opendaylight.controller.messagebus.app.impl.EventAggregator;
-import org.opendaylight.controller.messagebus.app.impl.EventSourceManager;
-import org.opendaylight.controller.messagebus.app.impl.EventSourceTopology;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
-import org.opendaylight.controller.sal.core.api.Broker;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-public class InitializationContext {
-    private static final Logger LOGGER = LoggerFactory.getLogger(InitializationContext.class);
-
-    private final MdSAL mdSal;
-    private final DataStore dataStore;
-    private final EventSourceTopology eventSourceTopology;
-    private final EventSourceManager eventSourceManager;
-    private final EventAggregator eventAggregator;
-
-    public InitializationContext(List<NamespaceToStream> namespaceMapping) {
-        this.mdSal = new MdSAL();
-        this.dataStore = new DataStore(mdSal);
-        this.eventSourceTopology = new EventSourceTopology(dataStore);
-        this.eventSourceManager = new EventSourceManager(dataStore, mdSal, eventSourceTopology, namespaceMapping);
-        this.eventAggregator = new EventAggregator(mdSal, eventSourceTopology);
-    }
-
-    public synchronized void set(BindingAwareBroker.ProviderContext session) {
-        mdSal.setBindingAwareContext(session);
-
-        if (mdSal.isReady()) {
-            initialize();
-        }
-    }
-
-    public synchronized void set(Broker.ProviderSession session) {
-        mdSal.setBindingIndependentContext(session);
-
-        if (mdSal.isReady()) {
-            initialize();
-        }
-    }
-
-    private void initialize() {
-        eventSourceTopology.mdsalReady();
-        eventSourceManager.mdsalReady();
-        eventAggregator.mdsalReady();
-
-        LOGGER.info("InitializationContext started.");
-    }
-}
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/MdSAL.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/MdSAL.java
deleted file mode 100644 (file)
index 03b220a..0000000
+++ /dev/null
@@ -1,188 +0,0 @@
-/**
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.mdsal;
-
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
-import org.opendaylight.controller.sal.binding.api.BindingAwareService;
-import org.opendaylight.controller.sal.binding.api.mount.MountInstance;
-import org.opendaylight.controller.sal.binding.api.mount.MountProviderService;
-import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.controller.sal.core.api.BrokerService;
-import org.opendaylight.controller.sal.core.api.notify.NotificationListener;
-import org.opendaylight.controller.sal.core.api.notify.NotificationPublishService;
-import org.opendaylight.controller.sal.core.api.notify.NotificationService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.binding.RpcService;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MdSAL {
-    private static final Logger LOGGER = LoggerFactory.getLogger(MdSAL.class);
-
-    private BindingAwareBroker.ProviderContext bindingAwareContext;
-    private Broker.ProviderSession bindingIndependentContext;
-
-    // -----------------------------
-    // ----- FRAMEWORK METHODS -----
-    // -----------------------------
-    public void setBindingAwareContext(BindingAwareBroker.ProviderContext bindingAwareContext) {
-        this.bindingAwareContext = bindingAwareContext;
-    }
-
-    public void setBindingIndependentContext(Broker.ProviderSession bindingIndependentContext) {
-        this.bindingIndependentContext = bindingIndependentContext;
-    }
-
-    //TODO: We should hide brokers and expose functionalities instead
-    public DataBroker getDataBroker() {
-        return getBaSalService(DataBroker.class);
-    }
-
-    public synchronized boolean isReady() {
-        return (bindingAwareContext != null && bindingIndependentContext != null);
-    }
-
-    // -----------------------
-    // ----- API METHODS -----
-    // -----------------------
-    // TODO: Factor out API methods to interface
-    // method does not return registration object. Rather will hold references internally and manipulate using node id and API
-    public <T extends RpcService> void addRpcImplementation(Class<T> serviceInterface,
-                                                            T implementation)
-            throws IllegalStateException {
-        bindingAwareContext.addRpcImplementation(serviceInterface, implementation);
-    }
-
-    // method does not return registration object. Rather will hold references internally and manipulate using node id and API
-    public <T extends RpcService> void addRpcImplementation(Node node,
-                                                            Class<T> serviceInterface,
-                                                            T implementation)
-            throws IllegalStateException {
-        BindingAwareBroker.RoutedRpcRegistration<T> registration
-                = addRoutedRpcImplementation(serviceInterface, implementation);
-
-        NodeRef nodeRef = createNodeRef(node.getId());
-        registration.registerPath(NodeContext.class, nodeRef.getValue());
-    }
-
-    public ListenerRegistration<NotificationListener> addNotificationListener(String nodeId,
-                                                                              QName notification,
-                                                                              NotificationListener listener) {
-        YangInstanceIdentifier yii = inventoryNodeBIIdentifier(nodeId);
-
-        NotificationService notificationService =
-                getBiSalService(DOMMountPointService.class)
-                        .getMountPoint(yii)
-                        .get()
-                        .getService(NotificationPublishService.class)
-                        .get();
-
-        ListenerRegistration<NotificationListener> registration =
-                notificationService.addNotificationListener(notification, listener);
-
-        LOGGER.info("Notification listener registered for {}, at node {}", notification, nodeId);
-
-        return registration;
-    }
-
-    public ListenerRegistration<NotificationListener> addNotificationListener(QName notification,
-                                                                              NotificationListener listener) {
-        NotificationService notificationService =
-                getBiSalService(NotificationPublishService.class);
-
-        ListenerRegistration<NotificationListener> registration =
-                notificationService.addNotificationListener(notification, listener);
-
-        LOGGER.info("Notification listener registered for {}.", notification);
-
-        return registration;
-    }
-
-    public <T extends RpcService> T getRpcService(Class<T> serviceInterface) {
-        return bindingAwareContext.getRpcService(serviceInterface);
-    }
-
-    public <T extends RpcService> T getRpcService(String nodeId, Class<T> serviceInterface) {
-        MountProviderService mountProviderService = getBaSalService(MountProviderService.class);
-
-        InstanceIdentifier<Node> key = InstanceIdentifier.create(Nodes.class)
-                                                         .child(Node.class,
-                                                                 new NodeKey(new NodeId(nodeId)));
-
-        MountInstance mountPoint = mountProviderService.getMountPoint(key);
-        return mountPoint.getRpcService(serviceInterface);
-    }
-
-    public void publishNotification(CompositeNode notification) {
-        getBiSalService(NotificationPublishService.class).publish(notification);
-    }
-
-    public SchemaContext getSchemaContext(String nodeId) {
-        YangInstanceIdentifier yii = inventoryNodeBIIdentifier(nodeId);
-
-        SchemaContext schemaContext =
-                getBiSalService(DOMMountPointService.class)
-                        .getMountPoint(yii)
-                        .get().getSchemaContext();
-
-        return schemaContext;
-    }
-
-    // ---------------------------
-    // ----- UTILITY METHODS -----
-    // ---------------------------
-    private <T extends BindingAwareService> T getBaSalService(Class<T> service) {
-        return bindingAwareContext.getSALService(service);
-    }
-
-    private <T extends BrokerService> T getBiSalService(Class<T> service) {
-        return bindingIndependentContext.getService(service);
-    }
-
-    private static final String NODE_ID_NAME = "id";
-
-    public static YangInstanceIdentifier inventoryNodeBIIdentifier(String nodeId) {
-        return YangInstanceIdentifier.builder()
-                .node(Nodes.QNAME)
-                .nodeWithKey(Node.QNAME,
-                             QName.create(Node.QNAME.getNamespace(),
-                                          Node.QNAME.getRevision(),
-                                          NODE_ID_NAME),
-                             nodeId)
-                .build();
-    }
-
-    private <T extends RpcService> BindingAwareBroker.RoutedRpcRegistration<T> addRoutedRpcImplementation(Class<T> serviceInterface,
-                                                                                                          T implementation)
-            throws IllegalStateException {
-        return bindingAwareContext.addRoutedRpcImplementation(serviceInterface, implementation);
-    }
-
-    public static NodeRef createNodeRef(NodeId nodeId) {
-        NodeKey nodeKey = new NodeKey(nodeId);
-        InstanceIdentifier<Node> path = InstanceIdentifier
-                .builder(Nodes.class)
-                .child(Node.class, nodeKey)
-                .build();
-        return new NodeRef(path);
-    }
-}
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventAggregator.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventAggregator.java
deleted file mode 100644 (file)
index 4b77bf2..0000000
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.messagebus.app.impl;
-
-import java.util.List;
-import java.util.concurrent.Future;
-import org.opendaylight.controller.mdsal.MdSAL;
-import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput;
-import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutput;
-import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutputBuilder;
-import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput;
-import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService;
-import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
-import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-// TODO: implement topic created notification
-public class EventAggregator implements EventAggregatorService {
-    private static final Logger LOGGER = LoggerFactory.getLogger(EventAggregator.class);
-
-    private final MdSAL mdSAL;
-    private final EventSourceTopology eventSourceTopology;
-
-    public EventAggregator(final MdSAL mdSAL, final EventSourceTopology eventSourceTopology) {
-        this.mdSAL = mdSAL;
-        this.eventSourceTopology = eventSourceTopology;
-    }
-
-    public void mdsalReady() {
-        mdSAL.addRpcImplementation(EventAggregatorService.class, this);
-    }
-
-    @Override
-    public Future<RpcResult<CreateTopicOutput>> createTopic(final CreateTopicInput input) {
-        LOGGER.info("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}",
-                input.getNotificationPattern(),
-                input.getNodeIdPattern());
-
-        Topic topic = new Topic(new NotificationPattern(input.getNotificationPattern()), input.getNodeIdPattern().getValue(), mdSAL);
-
-        //# Make sure we capture all nodes from now on
-        eventSourceTopology.registerDataChangeListener(topic);
-
-        //# Notify existing nodes
-        //# Code reader note: Context of Node type is NetworkTopology
-        List<Node> nodes = eventSourceTopology.snapshot();
-        for (Node node : nodes) {
-            NodeId nodeIdToNotify = node.getAugmentation(Node1.class).getEventSourceNode();
-            topic.notifyNode(nodeIdToNotify);
-        }
-
-        CreateTopicOutput cto = new CreateTopicOutputBuilder()
-                .setTopicId(topic.getTopicId())
-                .build();
-
-        return Util.resultFor(cto);
-    }
-
-    @Override
-    public Future<RpcResult<Void>> destroyTopic(final DestroyTopicInput input) {
-        // 1. UNREGISTER DATA CHANGE LISTENER -> ?
-        // 2. CLOSE TOPIC
-        return null;
-    }
-}
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceManager.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceManager.java
deleted file mode 100644 (file)
index a84eddd..0000000
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.messagebus.app.impl;
-
-import org.opendaylight.controller.config.yang.messagebus.app.impl.NamespaceToStream;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.mdsal.DataStore;
-import org.opendaylight.controller.mdsal.MdSAL;
-import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode;
-import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public final class EventSourceManager implements DataChangeListener {
-    private static final Logger LOGGER = LoggerFactory.getLogger(EventSourceManager.class);
-    private static final InstanceIdentifier<Node> INVENTORY_PATH = InstanceIdentifier.create(Nodes.class)
-                                                                                     .child(Node.class);
-    private final DataStore dataStore;
-    private final MdSAL mdSal;
-    private final EventSourceTopology eventSourceTopology;
-    private final Map<String, String> streamMap;
-
-    public EventSourceManager(DataStore dataStore,
-                              MdSAL mdSal,
-                              EventSourceTopology eventSourceTopology,
-                              List<NamespaceToStream> namespaceMapping) {
-        this.dataStore = dataStore;
-        this.mdSal = mdSal;
-        this.eventSourceTopology = eventSourceTopology;
-        this.streamMap = namespaceToStreamMapping(namespaceMapping);
-    }
-
-    private Map namespaceToStreamMapping(List<NamespaceToStream> namespaceMapping) {
-        Map<String, String> streamMap = new HashMap<>(namespaceMapping.size());
-
-        for (NamespaceToStream nToS  : namespaceMapping) {
-            streamMap.put(nToS.getUrnPrefix(), nToS.getStreamName());
-        }
-
-        return streamMap;
-    }
-
-    public void mdsalReady() {
-        dataStore.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
-                                             INVENTORY_PATH,
-                                             this,
-                                             DataBroker.DataChangeScope.SUBTREE);
-
-        LOGGER.info("EventSourceManager initialized.");
-    }
-
-    @Override
-    public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> event) {
-        //FIXME: Prevent creating new event source on subsequent changes in inventory, like disconnect.
-        LOGGER.debug("[DataChangeEvent<InstanceIdentifier<?>, DataObject>: {}]", event);
-
-        Node node = Util.getAffectedNode(event);
-        // we listen on node tree, therefore we should rather throw IllegalStateException when node is null
-        if ( node == null ) {
-            LOGGER.debug("OnDataChanged Event. Node is null.");
-            return;
-        }
-        if ( isNetconfNode(node) == false ) {
-            LOGGER.debug("OnDataChanged Event. Not a Netconf node.");
-            return;
-        }
-        if ( isEventSource(node) == false ) {
-            LOGGER.debug("OnDataChanged Event. Node an EventSource node.");
-            return;
-        }
-
-        NetconfEventSource netconfEventSource = new NetconfEventSource(mdSal,
-                                                                       node.getKey().getId().getValue(),
-                                                                       streamMap);
-        mdSal.addRpcImplementation(node, EventSourceService.class, netconfEventSource);
-
-        InstanceIdentifier<NetconfNode> nodeInstanceIdentifier =
-                InstanceIdentifier.create(Nodes.class)
-                        .child(Node.class, node.getKey())
-                        .augmentation(NetconfNode.class);
-
-        dataStore.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
-                nodeInstanceIdentifier,
-                netconfEventSource,
-                DataBroker.DataChangeScope.SUBTREE);
-
-        eventSourceTopology.insert(node);
-    }
-
-    private boolean isNetconfNode(Node node)  {
-        return node.getAugmentation(NetconfNode.class) != null ;
-    }
-
-    public boolean isEventSource(Node node) {
-        NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
-
-        return isEventSource(netconfNode);
-    }
-
-    private boolean isEventSource(NetconfNode node) {
-        for (String capability : node.getInitialCapability()) {
-            if(capability.startsWith("urn:ietf:params:xml:ns:netconf:notification")) {
-                return true;
-            }
-        }
-
-        return false;
-    }
-}
\ No newline at end of file
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -9,36 +9,36 @@
 package org.opendaylight.controller.messagebus.app.impl;
 
 import com.google.common.base.Preconditions;
+import java.util.Map;
 import java.util.regex.Pattern;
 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.mdsal.MdSAL;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.LoggerFactory;
 
-public class Topic implements DataChangeListener {
-    private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(Topic.class);
+public class EventSourceTopic implements DataChangeListener {
+    private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(EventSourceTopic.class);
     private final NotificationPattern notificationPattern;
+    private final EventSourceService sourceService;
     private final Pattern nodeIdPattern;
     private final TopicId topicId;
-    private final MdSAL mdSal;
 
-    public Topic(final NotificationPattern notificationPattern, final String nodeIdPattern, final MdSAL mdSal) {
+    public EventSourceTopic(final NotificationPattern notificationPattern, final String nodeIdPattern, final EventSourceService eventSource) {
         this.notificationPattern = Preconditions.checkNotNull(notificationPattern);
+        this.sourceService = eventSource;
 
         // FIXME: regex should be the language of nodeIdPattern
         final String regex = Util.wildcardToRegex(nodeIdPattern);
         this.nodeIdPattern = Pattern.compile(regex);
-        this.mdSal = Preconditions.checkNotNull(mdSal);
+
 
         // FIXME: We need to perform some salting in order to make
         //        the topic IDs less predictable.
@@ -51,26 +51,27 @@ public class Topic implements DataChangeListener {
 
     @Override
     public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> event) {
-        // TODO: affected must return topologyNode !!!
-        final Node node = Util.getAffectedNode(event);
-        if (nodeIdPattern.matcher(node.getId().getValue()).matches()) {
-            notifyNode(node.getId());
-        } else {
-            LOG.debug("Skipping node {}", node.getId());
+                for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getUpdatedData().entrySet()) {
+            if (changeEntry.getValue() instanceof Node) {
+                final Node node = (Node) changeEntry.getValue();
+                if (nodeIdPattern.matcher(node.getId().getValue()).matches()) {
+                    notifyNode(changeEntry.getKey());
+                }
+            }
         }
     }
 
-    public void notifyNode(final NodeId nodeId) {
-        JoinTopicInput jti = getJoinTopicInputArgument(nodeId);
-        EventSourceService ess = mdSal.getRpcService(EventSourceService.class);
-        Preconditions.checkState(ess != null, "EventSourceService is not registered");
-
-        ess.joinTopic(jti);
+    public void notifyNode(final InstanceIdentifier<?> nodeId) {
+        try {
+            sourceService.joinTopic(getJoinTopicInputArgument(nodeId));
+        } catch (final Exception e) {
+            LOG.error("Could not invoke join topic for node {}", nodeId);
+        }
     }
 
-    private JoinTopicInput getJoinTopicInputArgument(final NodeId nodeId) {
-        NodeRef nodeRef = MdSAL.createNodeRef(nodeId);
-        JoinTopicInput jti =
+    private JoinTopicInput getJoinTopicInputArgument(final InstanceIdentifier<?> path) {
+        final NodeRef nodeRef = new NodeRef(path);
+        final JoinTopicInput jti =
                 new JoinTopicInputBuilder()
                         .setNode(nodeRef.getValue())
                         .setTopicId(topicId)
@@ -78,4 +79,6 @@ public class Topic implements DataChangeListener {
                         .build();
         return jti;
     }
+
+
 }
index c070097..603f34b 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
 
 package org.opendaylight.controller.messagebus.app.impl;
 
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Futures;
+
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.mdsal.DataStore;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1Builder;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.TopologyTypes1;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.TopologyTypes1Builder;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSource;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSourceBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypes;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.regex.Pattern;
 
-public class EventSourceTopology {
-    private static final Logger LOGGER = LoggerFactory.getLogger(EventSourceTopology.class);
+public class EventSourceTopology implements EventAggregatorService, AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class);
 
-    private static final String topologyId = "EVENT-SOURCE-TOPOLOGY" ;
-    private static final TopologyKey topologyKey = new TopologyKey(new TopologyId(topologyId));
-    private static final LogicalDatastoreType datastoreType = LogicalDatastoreType.OPERATIONAL;
+    private static final String TOPOLOGY_ID = "EVENT-SOURCE-TOPOLOGY" ;
+    private static final TopologyKey EVENT_SOURCE_TOPOLOGY_KEY = new TopologyKey(new TopologyId(TOPOLOGY_ID));
+    private static final LogicalDatastoreType OPERATIONAL = LogicalDatastoreType.OPERATIONAL;
 
-    private static final InstanceIdentifier<Topology> topologyInstanceIdentifier =
+    private static final InstanceIdentifier<Topology> EVENT_SOURCE_TOPOLOGY_PATH =
             InstanceIdentifier.create(NetworkTopology.class)
-                    .child(Topology.class, topologyKey);
+                    .child(Topology.class, EVENT_SOURCE_TOPOLOGY_KEY);
 
-    private static final InstanceIdentifier<TopologyTypes1> topologyTypeInstanceIdentifier =
-            topologyInstanceIdentifier
+    private static final InstanceIdentifier<TopologyTypes1> TOPOLOGY_TYPE_PATH =
+            EVENT_SOURCE_TOPOLOGY_PATH
                     .child(TopologyTypes.class)
                     .augmentation(TopologyTypes1.class);
 
-    private static final InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang
-                                            .network.topology.rev131021.network.topology.topology.Node> eventSourceTopologyPath =
-            InstanceIdentifier.create(NetworkTopology.class)
-                    .child(Topology.class)
-                    .child(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang
-                            .network.topology.rev131021.network.topology.topology.Node.class);
-
     private final Map<DataChangeListener, ListenerRegistration<DataChangeListener>> registrations =
             new ConcurrentHashMap<>();
 
-    private final DataStore dataStore;
+    private final DataBroker dataBroker;
+    private final RpcRegistration<EventAggregatorService> aggregatorRpcReg;
+    private final EventSourceService eventSourceService;
+    private final RpcProviderRegistry rpcRegistry;
+    private final ExecutorService executorService;
+
+    public EventSourceTopology(final DataBroker dataBroker, final RpcProviderRegistry rpcRegistry) {
+        this.dataBroker = dataBroker;
+        this.executorService = Executors.newCachedThreadPool();
+        this.rpcRegistry = rpcRegistry;
+        aggregatorRpcReg = rpcRegistry.addRpcImplementation(EventAggregatorService.class, this);
+        eventSourceService = rpcRegistry.getRpcService(EventSourceService.class);
+
+        final TopologyEventSource topologySource = new TopologyEventSourceBuilder().build();
+        final TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build();
+        putData(OPERATIONAL, TOPOLOGY_TYPE_PATH, topologyTypeAugment);
+    }
+
+    private <T extends DataObject>  void putData(final LogicalDatastoreType store,
+            final InstanceIdentifier<T> path, final T data) {
 
-    public EventSourceTopology(DataStore dataStore) {
-        this.dataStore = dataStore;
+        final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
+        tx.put(store, path, data, true);
+        tx.submit();
     }
 
-    public void mdsalReady() {
-        TopologyEventSource topologySource = new TopologyEventSourceBuilder().build();
-        TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build();
+    private void insert(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath, final Node node) {
+        final NodeKey nodeKey = node.getKey();
+        final InstanceIdentifier<Node1> augmentPath = sourcePath.augmentation(Node1.class);
+        final Node1 nodeAgument = new Node1Builder().setEventSourceNode(new NodeId(nodeKey.getNodeId().getValue())).build();
+        putData(OPERATIONAL, augmentPath, nodeAgument);
+    }
 
-        dataStore.asyncPUT(datastoreType, topologyTypeInstanceIdentifier, topologyTypeAugment);
+    private void notifyExistingNodes(final Pattern nodeIdPatternRegex, final EventSourceTopic eventSourceTopic){
+        executorService.execute(new NotifyAllNodeExecutor(dataBroker, nodeIdPatternRegex, eventSourceTopic));
     }
 
-    public void insert(Node node) {
-        String nodeId = node.getKey().getId().getValue();
-        NodeKey nodeKey = new NodeKey(new NodeId(nodeId));
-        InstanceIdentifier<Node1> topologyNodeAugment
-                = topologyInstanceIdentifier
-                .child(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang
-                        .network.topology.rev131021.network.topology.topology.Node.class, nodeKey)
-                .augmentation(Node1.class);
-
-        Node1 nodeAgument = new Node1Builder().setEventSourceNode(node.getId()).build();
-        dataStore.asyncPUT(datastoreType, topologyNodeAugment, nodeAgument);
+    @Override
+    public Future<RpcResult<CreateTopicOutput>> createTopic(final CreateTopicInput input) {
+        LOG.info("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}",
+                input.getNotificationPattern(),
+                input.getNodeIdPattern());
+
+        final NotificationPattern notificationPattern = new NotificationPattern(input.getNotificationPattern());
+        final String nodeIdPattern = input.getNodeIdPattern().getValue();
+        final Pattern nodeIdPatternRegex = Pattern.compile(Util.wildcardToRegex(nodeIdPattern));
+        final EventSourceTopic eventSourceTopic = new EventSourceTopic(notificationPattern, input.getNodeIdPattern().getValue(), eventSourceService);
+
+        registerTopic(eventSourceTopic);
+
+        notifyExistingNodes(nodeIdPatternRegex, eventSourceTopic);
+
+        final CreateTopicOutput cto = new CreateTopicOutputBuilder()
+                .setTopicId(eventSourceTopic.getTopicId())
+                .build();
+
+        return Util.resultFor(cto);
+    }
+
+    @Override
+    public Future<RpcResult<Void>> destroyTopic(final DestroyTopicInput input) {
+        return Futures.immediateFailedFuture(new UnsupportedOperationException("Not Implemented"));
     }
 
-    // TODO: Should we expose this functioanlity over RPC?
-    public List<org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang
-                .network.topology.rev131021.network.topology.topology.Node> snapshot() {
-        Topology topology = dataStore.read(datastoreType, topologyInstanceIdentifier);
-        return topology.getNode();
+    @Override
+    public void close() {
+        aggregatorRpcReg.close();
     }
 
-    public void registerDataChangeListener(DataChangeListener listener) {
-        ListenerRegistration<DataChangeListener> listenerRegistration = dataStore.registerDataChangeListener(datastoreType,
-                eventSourceTopologyPath,
+    public void registerTopic(final EventSourceTopic listener) {
+        final ListenerRegistration<DataChangeListener> listenerRegistration = dataBroker.registerDataChangeListener(OPERATIONAL,
+                EVENT_SOURCE_TOPOLOGY_PATH,
                 listener,
                 DataBroker.DataChangeScope.SUBTREE);
 
         registrations.put(listener, listenerRegistration);
     }
+
+    public void register(final Node node, final NetconfEventSource netconfEventSource) {
+        final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey());
+        rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, netconfEventSource)
+            .registerPath(NodeContext.class, sourcePath);
+        insert(sourcePath,node);
+        // FIXME: Return registration object.
+    }
+
+    private class NotifyAllNodeExecutor implements Runnable {
+
+        private final EventSourceTopic topic;
+        private final DataBroker dataBroker;
+        private final Pattern nodeIdPatternRegex;
+
+        public NotifyAllNodeExecutor(final DataBroker dataBroker, final Pattern nodeIdPatternRegex, final EventSourceTopic topic) {
+            this.topic = topic;
+            this.dataBroker = dataBroker;
+            this.nodeIdPatternRegex = nodeIdPatternRegex;
+        }
+
+        @Override
+        public void run() {
+            //# Code reader note: Context of Node type is NetworkTopology
+            final List<Node> nodes = snapshot();
+            for (final Node node : nodes) {
+                if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) {
+                    topic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()));
+                }
+            }
+        }
+
+        private List<Node> snapshot() {
+            try (ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();) {
+
+                final Optional<Topology> data = tx.read(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH).checkedGet();
+
+                if(data.isPresent()) {
+                    final List<Node> nodeList = data.get().getNode();
+                    if(nodeList != null) {
+                        return nodeList;
+                    }
+                }
+                return Collections.emptyList();
+            } catch (final ReadFailedException e) {
+                LOG.error("Unable to retrieve node list.", e);
+                return Collections.emptyList();
+            }
+        }
+    }
 }
index 9c0697f..0d54beb 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -8,57 +8,92 @@
 
 package org.opendaylight.controller.messagebus.app.impl;
 
-import com.google.common.base.Preconditions;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.Future;
 import java.util.regex.Pattern;
+
+import javax.xml.stream.XMLStreamException;
+import javax.xml.transform.dom.DOMResult;
+import javax.xml.transform.dom.DOMSource;
+
 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.MountPoint;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.mdsal.MdSAL;
-import org.opendaylight.controller.sal.core.api.notify.NotificationListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
+import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicNotification;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.AnyXmlNode;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+
+public class NetconfEventSource implements EventSourceService, DOMNotificationListener, DataChangeListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSource.class);
+
+    private static final NodeIdentifier TOPIC_NOTIFICATION_ARG = new NodeIdentifier(TopicNotification.QNAME);
+    private static final NodeIdentifier EVENT_SOURCE_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "node-id"));
+    private static final NodeIdentifier PAYLOAD_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "payload"));
+
+    private static final NodeIdentifier STREAM_QNAME = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"stream"));
+    private static final SchemaPath CREATE_SUBSCRIPTION = SchemaPath.create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription"));
 
-public class NetconfEventSource implements EventSourceService, NotificationListener, DataChangeListener {
-    private static final Logger LOGGER = LoggerFactory.getLogger(NetconfEventSource.class);
 
-    private final MdSAL mdSal;
     private final String nodeId;
 
-    private final List<String> activeStreams = new ArrayList<>();
+
+    private final DOMMountPoint netconfMount;
+    private final DOMNotificationPublishService domPublish;
+    private final NotificationsService notificationRpcService;
+
+    private final Set<String> activeStreams = new ConcurrentSkipListSet<>();
 
     private final Map<String, String> urnPrefixToStreamMap;
 
-    public NetconfEventSource(final MdSAL mdSal, final String nodeId, final Map<String, String> streamMap) {
-        Preconditions.checkNotNull(mdSal);
-        Preconditions.checkNotNull(nodeId);
 
-        this.mdSal = mdSal;
+    public NetconfEventSource(final String nodeId, final Map<String, String> streamMap, final DOMMountPoint netconfMount, final DOMNotificationPublishService publishService, final MountPoint bindingMount) {
+        this.netconfMount = netconfMount;
+        this.notificationRpcService = bindingMount.getService(RpcConsumerRegistry.class).get().getRpcService(NotificationsService.class);
         this.nodeId = nodeId;
         this.urnPrefixToStreamMap = streamMap;
-
-        LOGGER.info("NetconfEventSource [{}] created.", nodeId);
+        this.domPublish = publishService;
+        LOG.info("NetconfEventSource [{}] created.", nodeId);
     }
 
     @Override
@@ -69,63 +104,63 @@ public class NetconfEventSource implements EventSourceService, NotificationListe
         final String regex = Util.wildcardToRegex(notificationPattern.getValue());
 
         final Pattern pattern = Pattern.compile(regex);
-        List<QName> matchingNotifications = Util.expandQname(availableNotifications(), pattern);
+        final List<SchemaPath> matchingNotifications = Util.expandQname(availableNotifications(), pattern);
         registerNotificationListener(matchingNotifications);
-        return null;
+        final JoinTopicOutput output = new JoinTopicOutputBuilder().build();
+        return com.google.common.util.concurrent.Futures.immediateFuture(RpcResultBuilder.success(output).build());
     }
 
-    private List<QName> availableNotifications() {
+    private List<SchemaPath> availableNotifications() {
         // FIXME: use SchemaContextListener to get changes asynchronously
-        Set<NotificationDefinition> availableNotifications = mdSal.getSchemaContext(nodeId).getNotifications();
-        List<QName> qNs = new ArrayList<>(availableNotifications.size());
-        for (NotificationDefinition nd : availableNotifications) {
-            qNs.add(nd.getQName());
+        final Set<NotificationDefinition> availableNotifications = netconfMount.getSchemaContext().getNotifications();
+        final List<SchemaPath> qNs = new ArrayList<>(availableNotifications.size());
+        for (final NotificationDefinition nd : availableNotifications) {
+            qNs.add(nd.getPath());
         }
-
         return qNs;
     }
 
-    private void registerNotificationListener(final List<QName> notificationsToSubscribe) {
-        for (QName qName : notificationsToSubscribe) {
-            startSubscription(qName);
-            // FIXME: do not lose this registration
-            final ListenerRegistration<NotificationListener> reg = mdSal.addNotificationListener(nodeId, qName, this);
+    private void registerNotificationListener(final List<SchemaPath> notificationsToSubscribe) {
+
+        final Optional<DOMNotificationService> notifyService = netconfMount.getService(DOMNotificationService.class);
+        if(notifyService.isPresent()) {
+            for (final SchemaPath qName : notificationsToSubscribe) {
+                startSubscription(qName);
+            }
+            // FIXME: Capture registration
+            notifyService.get().registerNotificationListener(this, notificationsToSubscribe);
         }
     }
 
-    private synchronized void startSubscription(final QName qName) {
-        String streamName = resolveStream(qName);
+    private void startSubscription(final SchemaPath path) {
+        final String streamName = resolveStream(path.getLastComponent());
 
         if (streamIsActive(streamName) == false) {
-            LOGGER.info("Stream {} is not active on node {}. Will subscribe.", streamName, nodeId);
+            LOG.info("Stream {} is not active on node {}. Will subscribe.", streamName, nodeId);
             startSubscription(streamName);
         }
     }
 
-    private synchronized void resubscribeToActiveStreams() {
-        for (String streamName : activeStreams) {
+    private void resubscribeToActiveStreams() {
+        for (final String streamName : activeStreams) {
             startSubscription(streamName);
         }
     }
 
     private synchronized void startSubscription(final String streamName) {
-        CreateSubscriptionInput subscriptionInput = getSubscriptionInput(streamName);
-        mdSal.getRpcService(nodeId, NotificationsService.class).createSubscription(subscriptionInput);
+        final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME))
+            .withChild(ImmutableNodes.leafNode(STREAM_QNAME, streamName))
+            .build();
+        netconfMount.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input);
         activeStreams.add(streamName);
     }
 
-    private static CreateSubscriptionInput getSubscriptionInput(final String streamName) {
-        CreateSubscriptionInputBuilder csib = new CreateSubscriptionInputBuilder();
-        csib.setStream(new StreamNameType(streamName));
-        return csib.build();
-    }
-
     private String resolveStream(final QName qName) {
         String streamName = null;
 
-        for (Map.Entry<String, String> entry : urnPrefixToStreamMap.entrySet()) {
-            String nameSpace = qName.getNamespace().toString();
-            String urnPrefix = entry.getKey();
+        for (final Map.Entry<String, String> entry : urnPrefixToStreamMap.entrySet()) {
+            final String nameSpace = qName.getNamespace().toString();
+            final String urnPrefix = entry.getKey();
             if( nameSpace.startsWith(urnPrefix) ) {
                 streamName = entry.getValue();
                 break;
@@ -139,24 +174,40 @@ public class NetconfEventSource implements EventSourceService, NotificationListe
         return activeStreams.contains(streamName);
     }
 
-    // PASS
-    @Override public Set<QName> getSupportedNotifications() {
-        return null;
+    @Override
+    public void onNotification(final DOMNotification notification) {
+        final ContainerNode topicNotification = Builders.containerBuilder()
+                .withNodeIdentifier(TOPIC_NOTIFICATION_ARG)
+                .withChild(ImmutableNodes.leafNode(EVENT_SOURCE_ARG, nodeId))
+                .withChild(encapsulate(notification))
+                .build();
+        try {
+            domPublish.putNotification(new TopicDOMNotification(topicNotification));
+        } catch (final InterruptedException e) {
+            throw Throwables.propagate(e);
+        }
     }
 
-    @Override
-    public void onNotification(final CompositeNode notification) {
-        LOGGER.info("NetconfEventSource {} received notification {}. Will publish to MD-SAL.", nodeId, notification);
-        ImmutableCompositeNode payload = ImmutableCompositeNode.builder()
-                .setQName(QName.create(TopicNotification.QNAME, "payload"))
-                .add(notification).toInstance();
-        ImmutableCompositeNode icn = ImmutableCompositeNode.builder()
-                .setQName(TopicNotification.QNAME)
-                .add(payload)
-                .addLeaf("event-source", nodeId)
-                .toInstance();
-
-        mdSal.publishNotification(icn);
+    private AnyXmlNode encapsulate(final DOMNotification body) {
+        // FIXME: Introduce something like AnyXmlWithNormalizedNodeData in Yangtools
+        final Document doc = XmlUtil.newDocument();
+        final Optional<String> namespace = Optional.of(PAYLOAD_ARG.getNodeType().getNamespace().toString());
+        final Element element = XmlUtil.createElement(doc , "payload", namespace);
+
+
+        final DOMResult result = new DOMResult(element);
+
+        final SchemaContext context = netconfMount.getSchemaContext();
+        final SchemaPath schemaPath = body.getType();
+        try {
+            NetconfMessageTransformUtil.writeNormalizedNode(body.getBody(), result, schemaPath, context);
+            return Builders.anyXmlBuilder().withNodeIdentifier(PAYLOAD_ARG)
+                    .withValue(new DOMSource(element))
+                    .build();
+        } catch (IOException | XMLStreamException e) {
+            LOG.error("Unable to encapsulate notification.",e);
+            throw Throwables.propagate(e);
+        }
     }
 
     @Override
@@ -164,16 +215,16 @@ public class NetconfEventSource implements EventSourceService, NotificationListe
         boolean wasConnected = false;
         boolean nowConnected = false;
 
-        for (Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : change.getOriginalData().entrySet()) {
+        for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : change.getOriginalData().entrySet()) {
             if ( isNetconfNode(changeEntry) ) {
-                NetconfNode nn = (NetconfNode)changeEntry.getValue();
+                final NetconfNode nn = (NetconfNode)changeEntry.getValue();
                 wasConnected = nn.isConnected();
             }
         }
 
-        for (Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : change.getUpdatedData().entrySet()) {
+        for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : change.getUpdatedData().entrySet()) {
             if ( isNetconfNode(changeEntry) ) {
-                NetconfNode nn = (NetconfNode)changeEntry.getValue();
+                final NetconfNode nn = (NetconfNode)changeEntry.getValue();
                 nowConnected = nn.isConnected();
             }
         }
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceManager.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceManager.java
new file mode 100644 (file)
index 0000000..6533136
--- /dev/null
@@ -0,0 +1,182 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.messagebus.app.impl;
+
+import com.google.common.base.Optional;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.opendaylight.controller.config.yang.messagebus.app.impl.NamespaceToStream;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.MountPoint;
+import org.opendaylight.controller.md.sal.binding.api.MountPointService;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class NetconfEventSourceManager implements DataChangeListener, AutoCloseable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(NetconfEventSourceManager.class);
+    private static final TopologyKey NETCONF_TOPOLOGY_KEY = new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName()));
+    private static final InstanceIdentifier<Node> NETCONF_DEVICE_PATH = InstanceIdentifier.create(NetworkTopology.class)
+                .child(Topology.class, NETCONF_TOPOLOGY_KEY)
+                .child(Node.class);
+
+    private static final YangInstanceIdentifier NETCONF_DEVICE_DOM_PATH = YangInstanceIdentifier.builder()
+            .node(NetworkTopology.QNAME)
+            .node(Topology.QNAME)
+            .nodeWithKey(Topology.QNAME, QName.create(Topology.QNAME, "topology-id"),TopologyNetconf.QNAME.getLocalName())
+            .node(Node.QNAME)
+            .build();
+    private static final QName NODE_ID_QNAME = QName.create(Node.QNAME,"node-id");
+
+
+    private final EventSourceTopology eventSourceTopology;
+    private final Map<String, String> streamMap;
+
+    private final ConcurrentHashMap<InstanceIdentifier<?>, NetconfEventSource> netconfSources = new ConcurrentHashMap<>();
+    private final ListenerRegistration<DataChangeListener> listenerReg;
+    private final DOMNotificationPublishService publishService;
+    private final DOMMountPointService domMounts;
+    private final MountPointService bindingMounts;
+
+    public NetconfEventSourceManager(final DataBroker dataStore,
+                              final DOMNotificationPublishService domPublish,
+                              final DOMMountPointService domMount,
+                              final MountPointService bindingMount,
+                              final EventSourceTopology eventSourceTopology,
+                              final List<NamespaceToStream> namespaceMapping) {
+
+        listenerReg = dataStore.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, NETCONF_DEVICE_PATH, this, DataChangeScope.SUBTREE);
+        this.eventSourceTopology = eventSourceTopology;
+        this.streamMap = namespaceToStreamMapping(namespaceMapping);
+        this.domMounts = domMount;
+        this.bindingMounts = bindingMount;
+        this.publishService = domPublish;
+        LOGGER.info("EventSourceManager initialized.");
+    }
+
+    private Map<String,String> namespaceToStreamMapping(final List<NamespaceToStream> namespaceMapping) {
+        final Map<String, String> streamMap = new HashMap<>(namespaceMapping.size());
+
+        for (final NamespaceToStream nToS  : namespaceMapping) {
+            streamMap.put(nToS.getUrnPrefix(), nToS.getStreamName());
+        }
+
+        return streamMap;
+    }
+
+    @Override
+    public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> event) {
+        //FIXME: Prevent creating new event source on subsequent changes in inventory, like disconnect.
+        LOGGER.debug("[DataChangeEvent<InstanceIdentifier<?>, DataObject>: {}]", event);
+        for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getCreatedData().entrySet()) {
+            if (changeEntry.getValue() instanceof Node) {
+                nodeUpdated(changeEntry.getKey(),(Node) changeEntry.getValue());
+            }
+        }
+
+
+        for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getUpdatedData().entrySet()) {
+            if (changeEntry.getValue() instanceof Node) {
+                nodeUpdated(changeEntry.getKey(),(Node) changeEntry.getValue());
+            }
+        }
+
+
+    }
+
+    private void nodeUpdated(final InstanceIdentifier<?> key, final Node node) {
+
+        // we listen on node tree, therefore we should rather throw IllegalStateException when node is null
+        if ( node == null ) {
+            LOGGER.debug("OnDataChanged Event. Node is null.");
+            return;
+        }
+        if ( isNetconfNode(node) == false ) {
+            LOGGER.debug("OnDataChanged Event. Not a Netconf node.");
+            return;
+        }
+        if ( isEventSource(node) == false ) {
+            LOGGER.debug("OnDataChanged Event. Node an EventSource node.");
+            return;
+        }
+        if(node.getAugmentation(NetconfNode.class).getConnectionStatus() != ConnectionStatus.Connected ) {
+            return;
+        }
+
+        if(!netconfSources.containsKey(key)) {
+            createEventSource(key,node);
+        }
+    }
+
+    private void createEventSource(final InstanceIdentifier<?> key, final Node node) {
+        final Optional<DOMMountPoint> netconfMount = domMounts.getMountPoint(domMountPath(node.getNodeId()));
+        final Optional<MountPoint> bindingMount = bindingMounts.getMountPoint(key);
+
+        if(netconfMount.isPresent() && bindingMount.isPresent()) {
+            final String nodeId = node.getNodeId().getValue();
+            final NetconfEventSource netconfEventSource = new NetconfEventSource(nodeId, streamMap, netconfMount.get(), publishService, bindingMount.get());
+            eventSourceTopology.register(node,netconfEventSource);
+            netconfSources.putIfAbsent(key, netconfEventSource);
+        }
+    }
+
+    private YangInstanceIdentifier domMountPath(final NodeId nodeId) {
+        return YangInstanceIdentifier.builder(NETCONF_DEVICE_DOM_PATH).nodeWithKey(Node.QNAME, NODE_ID_QNAME, nodeId.getValue()).build();
+    }
+
+    private boolean isNetconfNode(final Node node)  {
+        return node.getAugmentation(NetconfNode.class) != null ;
+    }
+
+    public boolean isEventSource(final Node node) {
+        final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
+
+        return isEventSource(netconfNode);
+    }
+
+    private boolean isEventSource(final NetconfNode node) {
+        for (final String capability : node.getAvailableCapabilities().getAvailableCapability()) {
+            if(capability.startsWith("(urn:ietf:params:xml:ns:netconf:notification")) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    @Override
+    public void close() {
+        listenerReg.close();
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/TopicDOMNotification.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/TopicDOMNotification.java
new file mode 100644 (file)
index 0000000..bd27db7
--- /dev/null
@@ -0,0 +1,40 @@
+
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.messagebus.app.impl;
+
+import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicNotification;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+public class TopicDOMNotification implements DOMNotification {
+
+    private static final SchemaPath TOPIC_NOTIFICATION_ID = SchemaPath.create(true, TopicNotification.QNAME);
+    private final ContainerNode body;
+
+    public TopicDOMNotification(final ContainerNode body) {
+        this.body = body;
+    }
+
+    @Override
+    public SchemaPath getType() {
+        return TOPIC_NOTIFICATION_ID;
+    }
+
+    @Override
+    public ContainerNode getBody() {
+        return body;
+    }
+
+    @Override
+    public String toString() {
+        return "TopicDOMNotification [body=" + body + "]";
+    }
+}
index 9927d85..1c0b8b3 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -8,24 +8,19 @@
 
 package org.opendaylight.controller.messagebus.app.impl;
 
-import com.google.common.util.concurrent.Futures;
 import java.math.BigInteger;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.Future;
 import java.util.regex.Pattern;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.sal.common.util.Rpcs;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcError;
+
 import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+import com.google.common.util.concurrent.Futures;
 
 public final class Util {
     private static final MessageDigest messageDigestTemplate = getDigestInstance();
@@ -33,65 +28,43 @@ public final class Util {
     private static MessageDigest getDigestInstance() {
         try {
             return MessageDigest.getInstance("MD5");
-        } catch (NoSuchAlgorithmException e) {
+        } catch (final NoSuchAlgorithmException e) {
             throw new RuntimeException("Unable to get MD5 instance");
         }
     }
 
-    public static String md5String(final String inputString) {
+    static String md5String(final String inputString) {
 
         try {
-            MessageDigest md = (MessageDigest)messageDigestTemplate.clone();
+            final MessageDigest md = (MessageDigest)messageDigestTemplate.clone();
             md.update(inputString.getBytes("UTF-8"), 0, inputString.length());
             return new BigInteger(1, md.digest()).toString(16);
-        } catch (Exception e) {
+        } catch (final Exception e) {
             throw new RuntimeException("Unable to get MD5 instance");
         }
     }
 
     public static <T> Future<RpcResult<T>> resultFor(final T output) {
-        RpcResult<T> result = Rpcs.getRpcResult(true, output, Collections.<RpcError>emptyList());
+        final RpcResult<T> result = RpcResultBuilder.success(output).build();
         return Futures.immediateFuture(result);
     }
 
-    /**
-     * Extracts affected node from data change event.
-     * @param event
-     * @return
-     */
-    public static Node getAffectedNode(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> event) {
-        // TODO: expect listener method to be called even when change impact node
-        // TODO: test with change.getCreatedData()
-        for (Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getUpdatedData().entrySet()) {
-            if (isNode(changeEntry)) {
-                return (Node) changeEntry.getValue();
-            }
-        }
-
-        return null;
-    }
-
-    private static boolean isNode(final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry )  {
-        return Node.class.equals(changeEntry.getKey().getTargetType());
-    }
-
     /**
      * Method filters qnames based on wildcard strings
      *
-     * @param availableQnames
+     * @param list
      * @param patterh matching pattern
      * @return list of filtered qnames
      */
-    public static List<QName> expandQname(final List<QName> availableQnames, final Pattern pattern) {
-        List<QName> matchingQnames = new ArrayList<>();
+    public static List<SchemaPath> expandQname(final List<SchemaPath> list, final Pattern pattern) {
+        final List<SchemaPath> matchingQnames = new ArrayList<>();
 
-        for (QName qname : availableQnames) {
-            String namespace = qname.getNamespace().toString();
+        for (final SchemaPath notification : list) {
+            final String namespace = notification.getLastComponent().getNamespace().toString();
             if (pattern.matcher(namespace).matches()) {
-                matchingQnames.add(qname);
+                matchingQnames.add(notification);
             }
         }
-
         return matchingQnames;
     }
 
@@ -101,9 +74,9 @@ public final class Util {
      * @return
      */
     static String wildcardToRegex(final String wildcard){
-        StringBuffer s = new StringBuffer(wildcard.length());
+        final StringBuffer s = new StringBuffer(wildcard.length());
         s.append('^');
-        for (char c : wildcard.toCharArray()) {
+        for (final char c : wildcard.toCharArray()) {
             switch(c) {
                 case '*':
                     s.append(".*");
diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleFactoryTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleFactoryTest.java
new file mode 100644 (file)
index 0000000..13c4221
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.config.yang.messagebus.app.impl;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.controller.config.api.DependencyResolver;
+import org.opendaylight.controller.config.api.DynamicMBeanWithInstance;
+import org.osgi.framework.BundleContext;
+
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+public class MessageBusAppImplModuleFactoryTest {
+
+    DependencyResolver dependencyResolverMock;
+    BundleContext bundleContextMock;
+    MessageBusAppImplModuleFactory messageBusAppImplModuleFactory;
+    DynamicMBeanWithInstance dynamicMBeanWithInstanceMock;
+
+    @BeforeClass
+    public static void initTestClass() throws IllegalAccessException, InstantiationException {
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        dependencyResolverMock = mock(DependencyResolver.class);
+        bundleContextMock = mock(BundleContext.class);
+        dynamicMBeanWithInstanceMock = mock(DynamicMBeanWithInstance.class);
+        messageBusAppImplModuleFactory = new MessageBusAppImplModuleFactory();
+    }
+
+    @Test
+    public void createModuleTest() {
+        assertNotNull("Module has not been created correctly.", messageBusAppImplModuleFactory.createModule("instanceName1", dependencyResolverMock, bundleContextMock));
+    }
+
+    @Test
+    public void createModuleBTest() throws Exception{
+        MessageBusAppImplModule messageBusAppImplModuleMock = mock(MessageBusAppImplModule.class);
+        doReturn(messageBusAppImplModuleMock).when(dynamicMBeanWithInstanceMock).getModule();
+        assertNotNull("Module has not been created correctly.", messageBusAppImplModuleFactory.createModule("instanceName1", dependencyResolverMock, dynamicMBeanWithInstanceMock, bundleContextMock));
+    }
+
+}
diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleTest.java
new file mode 100644 (file)
index 0000000..0794364
--- /dev/null
@@ -0,0 +1,121 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.config.yang.messagebus.app.impl;
+
+import com.google.common.util.concurrent.CheckedFuture;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.controller.config.api.DependencyResolver;
+import org.opendaylight.controller.config.api.ModuleIdentifier;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.messagebus.app.impl.EventSourceTopology;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.controller.sal.core.api.Provider;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.osgi.framework.BundleContext;
+
+import javax.management.ObjectName;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.notNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.doNothing;
+
+public class MessageBusAppImplModuleTest {
+
+    MessageBusAppImplModule messageBusAppImplModule;
+    ModuleIdentifier moduleIdentifier;
+    DependencyResolver dependencyResolverMock;
+
+    @BeforeClass
+    public static void initTestClass() throws IllegalAccessException, InstantiationException {
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        moduleIdentifier = new ModuleIdentifier("factoryName1", "instanceName1");
+        dependencyResolverMock = mock(DependencyResolver.class);
+        messageBusAppImplModule = new MessageBusAppImplModule(moduleIdentifier, dependencyResolverMock);
+    }
+
+    @Test
+    public void constructorTest() {
+        assertNotNull("Instance has not been created correctly.", messageBusAppImplModule);
+    }
+
+    @Test
+    public void constructorBTest() {
+        MessageBusAppImplModule messageBusAppImplModuleOld = mock(MessageBusAppImplModule.class);
+        java.lang.AutoCloseable oldInstanceAutocloseableMock = mock(AutoCloseable.class);
+        MessageBusAppImplModule messageBusAppImplModule = new MessageBusAppImplModule(moduleIdentifier, dependencyResolverMock, messageBusAppImplModuleOld, oldInstanceAutocloseableMock);
+        assertNotNull("Instance has not been created correctly.", messageBusAppImplModule);
+    }
+
+    @Test
+    public void setGetBundleContextTest() {
+        BundleContext bundleContext = mock(BundleContext.class);
+        messageBusAppImplModule.setBundleContext(bundleContext);
+        assertEquals("Set and/or get method/s don't work correctly.", bundleContext, messageBusAppImplModule.getBundleContext());
+    }
+
+    @Test
+    public void createInstanceTest() {
+        createInstanceTestHelper();
+        messageBusAppImplModule.getInstance();
+        assertNotNull("AutoCloseable instance has not been created correctly.", messageBusAppImplModule.createInstance());
+    }
+
+    private void createInstanceTestHelper(){
+        NamespaceToStream namespaceToStream = mock(NamespaceToStream.class);
+        List<NamespaceToStream> listNamespaceToStreamMock = new ArrayList<>();
+        listNamespaceToStreamMock.add(namespaceToStream);
+        messageBusAppImplModule.setNamespaceToStream(listNamespaceToStreamMock);
+        ObjectName objectName = mock(ObjectName.class);
+        org.opendaylight.controller.sal.core.api.Broker domBrokerDependency = mock(Broker.class);
+        org.opendaylight.controller.sal.binding.api.BindingAwareBroker bindingBrokerDependency = mock(BindingAwareBroker.class);
+        when(dependencyResolverMock.resolveInstance((java.lang.Class) notNull(), (javax.management.ObjectName) notNull(), eq(AbstractMessageBusAppImplModule.domBrokerJmxAttribute))).thenReturn(domBrokerDependency);
+        when(dependencyResolverMock.resolveInstance((java.lang.Class) notNull(), (javax.management.ObjectName) notNull(), eq(AbstractMessageBusAppImplModule.bindingBrokerJmxAttribute))).thenReturn(bindingBrokerDependency);
+        messageBusAppImplModule.setBindingBroker(objectName);
+        messageBusAppImplModule.setDomBroker(objectName);
+        BindingAwareBroker.ProviderContext providerContextMock = mock(BindingAwareBroker.ProviderContext.class);
+        doReturn(providerContextMock).when(bindingBrokerDependency).registerProvider(any(BindingAwareProvider.class));
+        Broker.ProviderSession providerSessionMock = mock(Broker.ProviderSession.class);
+        doReturn(providerSessionMock).when(domBrokerDependency).registerProvider(any(Provider.class));
+
+        DataBroker dataBrokerMock = mock(DataBroker.class);
+        doReturn(dataBrokerMock).when(providerContextMock).getSALService(DataBroker.class);
+        RpcProviderRegistry rpcProviderRegistryMock = mock(RpcProviderRegistry.class);
+        doReturn(rpcProviderRegistryMock).when(providerContextMock).getSALService(RpcProviderRegistry.class);
+        BindingAwareBroker.RpcRegistration rpcRegistrationMock = mock(BindingAwareBroker.RpcRegistration.class);
+        doReturn(rpcRegistrationMock).when(rpcProviderRegistryMock).addRpcImplementation(eq(EventAggregatorService.class), any(EventSourceTopology.class));
+        EventSourceService eventSourceServiceMock = mock(EventSourceService.class);
+        doReturn(eventSourceServiceMock).when(rpcProviderRegistryMock).getRpcService(EventSourceService.class);
+
+        WriteTransaction writeTransactionMock = mock(WriteTransaction.class);
+        doReturn(writeTransactionMock).when(dataBrokerMock).newWriteOnlyTransaction();
+        doNothing().when(writeTransactionMock).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(DataObject.class));
+        CheckedFuture checkedFutureMock = mock(CheckedFuture.class);
+        doReturn(checkedFutureMock).when(writeTransactionMock).submit();
+    }
+}
diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopicTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopicTest.java
new file mode 100644 (file)
index 0000000..5e26213
--- /dev/null
@@ -0,0 +1,87 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.app.impl;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.any;
+
+public class EventSourceTopicTest {
+
+    EventSourceTopic eventSourceTopic;
+    org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node dataObjectMock;
+    NodeId nodeIdMock;
+    EventSourceService eventSourceServiceMock;
+
+    @BeforeClass
+    public static void initTestClass() throws IllegalAccessException, InstantiationException {
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        NotificationPattern notificationPattern = new NotificationPattern("value1");
+        eventSourceServiceMock = mock(EventSourceService.class);
+        eventSourceTopic = new EventSourceTopic(notificationPattern, "nodeIdPattern1", eventSourceServiceMock);
+    }
+
+    @Test
+    public void createModuleTest() {
+        assertNotNull("Instance has not been created correctly.", eventSourceTopic);
+    }
+
+    @Test
+    public void getTopicIdTest() {
+        assertNotNull("Topic has not been created correctly.", eventSourceTopic.getTopicId());
+    }
+
+    @Test
+    public void onDataChangedTest() {
+        AsyncDataChangeEvent asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class);
+        onDataChangedTestHelper(asyncDataChangeEventMock);
+        eventSourceTopic.onDataChanged(asyncDataChangeEventMock);
+        verify(dataObjectMock, times(1)).getId();
+        verify(nodeIdMock, times(1)).getValue();
+    }
+
+    private void onDataChangedTestHelper(AsyncDataChangeEvent asyncDataChangeEventMock){
+        Map<InstanceIdentifier<?>, DataObject> map = new HashMap<>();
+        InstanceIdentifier instanceIdentifierMock = mock(InstanceIdentifier.class);
+        dataObjectMock = mock(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class);
+        map.put(instanceIdentifierMock, dataObjectMock);
+        doReturn(map).when(asyncDataChangeEventMock).getUpdatedData();
+
+        nodeIdMock = mock(NodeId.class);
+        doReturn(nodeIdMock).when(dataObjectMock).getId();
+        doReturn("0").when(nodeIdMock).getValue();
+    }
+
+    @Test
+    public void notifyNodeTest() {
+        InstanceIdentifier instanceIdentifierMock = mock(InstanceIdentifier.class);
+        eventSourceTopic.notifyNode(instanceIdentifierMock);
+        verify(eventSourceServiceMock, times(1)).joinTopic(any(JoinTopicInput.class));
+    }
+
+}
diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopologyTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopologyTest.java
new file mode 100644 (file)
index 0000000..c2f6ef5
--- /dev/null
@@ -0,0 +1,161 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.app.impl;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.Pattern;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.eq;
+
+public class EventSourceTopologyTest {
+
+    EventSourceTopology eventSourceTopology;
+    DataBroker dataBrokerMock;
+    RpcProviderRegistry rpcProviderRegistryMock;
+    CreateTopicInput createTopicInputMock;
+    ListenerRegistration listenerRegistrationMock;
+    NodeKey nodeKey;
+
+    @BeforeClass
+    public static void initTestClass() throws IllegalAccessException, InstantiationException {
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        dataBrokerMock = mock(DataBroker.class);
+        rpcProviderRegistryMock = mock(RpcProviderRegistry.class);
+
+    }
+
+    @Test
+    public void constructorTest() {
+        constructorTestHelper();
+        eventSourceTopology = new EventSourceTopology(dataBrokerMock, rpcProviderRegistryMock);
+        assertNotNull("Instance has not been created correctly.", eventSourceTopology);
+    }
+
+    private void constructorTestHelper(){
+        WriteTransaction writeTransactionMock = mock(WriteTransaction.class);
+        doReturn(writeTransactionMock).when(dataBrokerMock).newWriteOnlyTransaction();
+        doNothing().when(writeTransactionMock).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(DataObject.class));
+        CheckedFuture checkedFutureMock = mock(CheckedFuture.class);
+        doReturn(checkedFutureMock).when(writeTransactionMock).submit();
+    }
+
+    @Test
+    public void createTopicTest() throws Exception{
+        createTopicTestHelper();
+        assertNotNull("Topic has not been created correctly.", eventSourceTopology.createTopic(createTopicInputMock));
+    }
+
+    private void createTopicTestHelper() throws Exception{
+        constructorTestHelper();
+        createTopicInputMock = mock(CreateTopicInput.class);
+        eventSourceTopology = new EventSourceTopology(dataBrokerMock, rpcProviderRegistryMock);
+
+        NotificationPattern notificationPattern = new NotificationPattern("value1");
+        doReturn(notificationPattern).when(createTopicInputMock).getNotificationPattern();
+        Pattern pattern = new Pattern("valuePattern1");
+        doReturn(pattern).when(createTopicInputMock).getNodeIdPattern();
+
+        listenerRegistrationMock = mock(ListenerRegistration.class);
+        doReturn(listenerRegistrationMock).when(dataBrokerMock).registerDataChangeListener(eq(LogicalDatastoreType.OPERATIONAL),
+                any(InstanceIdentifier.class),
+                any(EventSourceTopic.class),
+                eq(DataBroker.DataChangeScope.SUBTREE));
+
+        ReadOnlyTransaction readOnlyTransactionMock = mock(ReadOnlyTransaction.class);
+        doReturn(readOnlyTransactionMock).when(dataBrokerMock).newReadOnlyTransaction();
+
+        CheckedFuture checkedFutureMock = mock(CheckedFuture.class);
+        doReturn(checkedFutureMock).when(readOnlyTransactionMock).read(eq(LogicalDatastoreType.OPERATIONAL),
+                any(InstanceIdentifier.class));
+        Optional optionalMock = mock(Optional.class);
+        doReturn(optionalMock).when(checkedFutureMock).checkedGet();
+        doReturn(true).when(optionalMock).isPresent();
+
+        Topology topologyMock = mock(Topology.class);
+        doReturn(topologyMock).when(optionalMock).get();
+        Node nodeMock = mock(Node.class);
+        List<Node> nodeList = new ArrayList<>();
+        nodeList.add(nodeMock);
+        doReturn(nodeList).when(topologyMock).getNode();
+
+        NodeId nodeId = new NodeId("nodeIdValue1");
+        doReturn(nodeId).when(nodeMock).getNodeId();
+    }
+
+    @Test
+    public void destroyTopicTest() throws Exception{
+        createTopicTestHelper();
+        DestroyTopicInput destroyTopicInput = null;
+        assertNotNull("Instance has not been created correctly.", eventSourceTopology.destroyTopic(destroyTopicInput));
+    }
+
+    @Test
+    public void closeTest() throws Exception{
+        BindingAwareBroker.RpcRegistration rpcRegistrationMock = mock(BindingAwareBroker.RpcRegistration.class);
+        doReturn(rpcRegistrationMock).when(rpcProviderRegistryMock).addRpcImplementation(eq(EventAggregatorService.class), any(EventSourceTopology.class));
+        doNothing().when(rpcRegistrationMock).close();
+        createTopicTestHelper();
+        eventSourceTopology.createTopic(createTopicInputMock);
+        eventSourceTopology.close();
+        verify(rpcRegistrationMock, times(1)).close();
+    }
+
+    @Test
+    public void registerTest() throws Exception {
+        createTopicTestHelper();
+        Node nodeMock = mock(Node.class);
+        NetconfEventSource netconfEventSourceMock = mock(NetconfEventSource.class);
+
+        NodeId nodeId = new NodeId("nodeIdValue1");
+        nodeKey = new NodeKey(nodeId);
+        doReturn(nodeKey).when(nodeMock).getKey();
+
+        BindingAwareBroker.RoutedRpcRegistration routedRpcRegistrationMock = mock(BindingAwareBroker.RoutedRpcRegistration.class);
+        doReturn(routedRpcRegistrationMock).when(rpcProviderRegistryMock).addRoutedRpcImplementation(EventSourceService.class, netconfEventSourceMock);
+        eventSourceTopology.register(nodeMock, netconfEventSourceMock);
+        verify(routedRpcRegistrationMock, times(1)).registerPath(eq(NodeContext.class), any(KeyedInstanceIdentifier.class));
+    }
+
+}
diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceManagerTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceManagerTest.java
new file mode 100644 (file)
index 0000000..911c5db
--- /dev/null
@@ -0,0 +1,180 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.app.impl;
+
+import com.google.common.base.Optional;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.controller.config.yang.messagebus.app.impl.NamespaceToStream;
+import org.opendaylight.controller.md.sal.binding.api.BindingService;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.MountPoint;
+import org.opendaylight.controller.md.sal.binding.api.MountPointService;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.AvailableCapabilities;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.notNull;
+
+public class NetconfEventSourceManagerTest {
+
+    NetconfEventSourceManager netconfEventSourceManager;
+    ListenerRegistration listenerRegistrationMock;
+    DOMMountPointService domMountPointServiceMock;
+    MountPointService mountPointServiceMock;
+    EventSourceTopology eventSourceTopologyMock;
+    AsyncDataChangeEvent asyncDataChangeEventMock;
+
+    @BeforeClass
+    public static void initTestClass() throws IllegalAccessException, InstantiationException {
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        DataBroker dataBrokerMock = mock(DataBroker.class);
+        DOMNotificationPublishService domNotificationPublishServiceMock = mock(DOMNotificationPublishService.class);
+        domMountPointServiceMock = mock(DOMMountPointService.class);
+        mountPointServiceMock = mock(MountPointService.class);
+        eventSourceTopologyMock = mock(EventSourceTopology.class);
+        List<NamespaceToStream> namespaceToStreamList = new ArrayList<>();
+
+        listenerRegistrationMock = mock(ListenerRegistration.class);
+        doReturn(listenerRegistrationMock).when(dataBrokerMock).registerDataChangeListener(eq(LogicalDatastoreType.OPERATIONAL), any(InstanceIdentifier.class), any(NetconfEventSourceManager.class), eq(AsyncDataBroker.DataChangeScope.SUBTREE));
+
+        netconfEventSourceManager = new NetconfEventSourceManager(dataBrokerMock, domNotificationPublishServiceMock, domMountPointServiceMock,
+                mountPointServiceMock, eventSourceTopologyMock, namespaceToStreamList);
+    }
+
+    @Test
+    public void constructorTest() {
+        assertNotNull("Instance has not been created correctly.", netconfEventSourceManager);
+    }
+
+    @Test
+    public void onDataChangedTest() {
+        AsyncDataChangeEvent asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class);
+        Map<InstanceIdentifier, DataObject> map = new HashMap<>();
+        InstanceIdentifier instanceIdentifierMock = mock(InstanceIdentifier.class);
+        Node dataObjectMock = mock(Node.class);
+        map.put(instanceIdentifierMock, dataObjectMock);
+        doReturn(map).when(asyncDataChangeEventMock).getCreatedData();
+        doReturn(map).when(asyncDataChangeEventMock).getUpdatedData();
+        netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
+        verify(dataObjectMock, times(2)).getAugmentation(NetconfNode.class);
+    }
+
+    @Test
+    public void onDataChangedCreateEventSourceTest() {
+        onDataChangedCreateEventSourceTestHelper();
+        netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
+        verify(eventSourceTopologyMock, times(1)).register(any(Node.class), any(NetconfEventSource.class));
+    }
+
+    private void onDataChangedCreateEventSourceTestHelper(){
+        asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class);
+        Map<InstanceIdentifier, DataObject> map = new HashMap<>();
+        InstanceIdentifier instanceIdentifierMock = mock(InstanceIdentifier.class);
+        Node dataObjectMock = mock(Node.class);
+        map.put(instanceIdentifierMock, dataObjectMock);
+        doReturn(map).when(asyncDataChangeEventMock).getCreatedData();
+        doReturn(map).when(asyncDataChangeEventMock).getUpdatedData();
+
+        NetconfNode netconfNodeMock = mock(NetconfNode.class);
+        AvailableCapabilities availableCapabilitiesMock = mock(AvailableCapabilities.class);
+        doReturn(netconfNodeMock).when(dataObjectMock).getAugmentation(NetconfNode.class);
+        doReturn(availableCapabilitiesMock).when(netconfNodeMock).getAvailableCapabilities();
+        List<String> availableCapabilityList = new ArrayList<>();
+        availableCapabilityList.add("(urn:ietf:params:xml:ns:netconf:notification_availableCapabilityString1");
+        doReturn(availableCapabilityList).when(availableCapabilitiesMock).getAvailableCapability();
+
+        doReturn(NetconfNodeFields.ConnectionStatus.Connected).when(netconfNodeMock).getConnectionStatus();
+
+        Optional optionalMock = mock(Optional.class);
+        Optional optionalBindingMountMock = mock(Optional.class);
+        NodeId nodeId = new NodeId("nodeId1");
+        doReturn(nodeId).when(dataObjectMock).getNodeId();
+        doReturn(optionalMock).when(domMountPointServiceMock).getMountPoint((YangInstanceIdentifier)notNull());
+        doReturn(optionalBindingMountMock).when(mountPointServiceMock).getMountPoint(any(InstanceIdentifier.class));
+        doReturn(true).when(optionalMock).isPresent();
+        doReturn(true).when(optionalBindingMountMock).isPresent();
+
+        DOMMountPoint domMountPointMock = mock(DOMMountPoint.class);
+        MountPoint mountPointMock = mock(MountPoint.class);
+        doReturn(domMountPointMock).when(optionalMock).get();
+        doReturn(mountPointMock).when(optionalBindingMountMock).get();
+
+        RpcConsumerRegistry rpcConsumerRegistryMock = mock(RpcConsumerRegistry.class);
+        Optional<BindingService> onlyOptionalMock = (Optional<BindingService>) mock(Optional.class);
+        NotificationsService notificationsServiceMock = mock(NotificationsService.class);
+
+        doReturn(onlyOptionalMock).when(mountPointMock).getService(RpcConsumerRegistry.class);
+        doReturn(rpcConsumerRegistryMock).when(onlyOptionalMock).get();
+        doReturn(notificationsServiceMock).when(rpcConsumerRegistryMock).getRpcService(NotificationsService.class);
+    }
+
+    @Test
+    public void isEventSourceTest() {
+        Node nodeMock = mock(Node.class);
+        NetconfNode netconfNodeMock = mock(NetconfNode.class);
+        AvailableCapabilities availableCapabilitiesMock = mock(AvailableCapabilities.class);
+        doReturn(netconfNodeMock).when(nodeMock).getAugmentation(NetconfNode.class);
+        doReturn(availableCapabilitiesMock).when(netconfNodeMock).getAvailableCapabilities();
+        List<String> availableCapabilityList = new ArrayList<>();
+        availableCapabilityList.add("(urn:ietf:params:xml:ns:netconf:notification_availableCapabilityString1");
+        doReturn(availableCapabilityList).when(availableCapabilitiesMock).getAvailableCapability();
+        assertTrue("Method has not been run correctly.", netconfEventSourceManager.isEventSource(nodeMock));
+    }
+
+    @Test
+    public void isNotEventSourceTest() {
+        Node nodeMock = mock(Node.class);
+        NetconfNode netconfNodeMock = mock(NetconfNode.class);
+        AvailableCapabilities availableCapabilitiesMock = mock(AvailableCapabilities.class);
+        doReturn(netconfNodeMock).when(nodeMock).getAugmentation(NetconfNode.class);
+        doReturn(availableCapabilitiesMock).when(netconfNodeMock).getAvailableCapabilities();
+        List<String> availableCapabilityList = new ArrayList<>();
+        availableCapabilityList.add("availableCapabilityString1");
+        doReturn(availableCapabilityList).when(availableCapabilitiesMock).getAvailableCapability();
+        assertFalse("Method has not been run correctly.", netconfEventSourceManager.isEventSource(nodeMock));
+    }
+
+    @Test
+    public void closeTest() {
+        netconfEventSourceManager.close();
+        verify(listenerRegistrationMock, times(1)).close();
+    }
+}
diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceTest.java
new file mode 100644 (file)
index 0000000..73117c1
--- /dev/null
@@ -0,0 +1,185 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.app.impl;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.binding.api.BindingService;
+import org.opendaylight.controller.md.sal.binding.api.MountPoint;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
+import org.opendaylight.controller.md.sal.dom.api.DOMService;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+import org.opendaylight.yangtools.yang.common.QName;
+
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.times;
+
+public class NetconfEventSourceTest {
+
+    NetconfEventSource netconfEventSource;
+    DOMMountPoint domMountPointMock;
+    JoinTopicInput joinTopicInputMock;
+
+    @BeforeClass
+    public static void initTestClass() throws IllegalAccessException, InstantiationException {
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        Map<String, String> streamMap = new HashMap<>();
+        streamMap.put("string1", "string2");
+        domMountPointMock = mock(DOMMountPoint.class);
+        DOMNotificationPublishService domNotificationPublishServiceMock = mock(DOMNotificationPublishService.class);
+        MountPoint mountPointMock = mock(MountPoint.class);
+
+        RpcConsumerRegistry rpcConsumerRegistryMock = mock(RpcConsumerRegistry.class);
+        Optional<BindingService> onlyOptionalMock = (Optional<BindingService>) mock(Optional.class);
+        NotificationsService notificationsServiceMock = mock(NotificationsService.class);
+
+        doReturn(onlyOptionalMock).when(mountPointMock).getService(RpcConsumerRegistry.class);
+        doReturn(rpcConsumerRegistryMock).when(onlyOptionalMock).get();
+        doReturn(notificationsServiceMock).when(rpcConsumerRegistryMock).getRpcService(NotificationsService.class);
+        netconfEventSource = new NetconfEventSource("nodeId1", streamMap, domMountPointMock, domNotificationPublishServiceMock, mountPointMock);
+    }
+
+    @Test
+    public void constructorTest() {
+        assertNotNull("Instance has not been created correctly.", netconfEventSource);
+    }
+
+    @Test
+    public void joinTopicTest() throws Exception{
+        joinTopicTestHelper();
+        assertNotNull("JoinTopic return value has not been created correctly.", netconfEventSource.joinTopic(joinTopicInputMock));
+    }
+
+    private void joinTopicTestHelper() throws Exception{
+        joinTopicInputMock = mock(JoinTopicInput.class);
+        NotificationPattern notificationPatternMock = mock(NotificationPattern.class);
+        doReturn(notificationPatternMock).when(joinTopicInputMock).getNotificationPattern();
+        doReturn("regexString1").when(notificationPatternMock).getValue();
+
+        SchemaContext schemaContextMock = mock(SchemaContext.class);
+        doReturn(schemaContextMock).when(domMountPointMock).getSchemaContext();
+        Set<NotificationDefinition> notificationDefinitionSet = new HashSet<>();
+        NotificationDefinition notificationDefinitionMock = mock(NotificationDefinition.class);
+        notificationDefinitionSet.add(notificationDefinitionMock);
+
+        URI uri = new URI("uriStr1");
+        QName qName = new QName(uri, "localName1");
+        org.opendaylight.yangtools.yang.model.api.SchemaPath schemaPath = SchemaPath.create(true, qName);
+        doReturn(notificationDefinitionSet).when(schemaContextMock).getNotifications();
+        doReturn(schemaPath).when(notificationDefinitionMock).getPath();
+
+        Optional<DOMNotificationService> domNotificationServiceOptionalMock = (Optional<DOMNotificationService>) mock(Optional.class);
+        doReturn(domNotificationServiceOptionalMock).when(domMountPointMock).getService(DOMNotificationService.class);
+        doReturn(true).when(domNotificationServiceOptionalMock).isPresent();
+
+        DOMNotificationService domNotificationServiceMock = mock(DOMNotificationService.class);
+        doReturn(domNotificationServiceMock).when(domNotificationServiceOptionalMock).get();
+        ListenerRegistration listenerRegistrationMock = mock(ListenerRegistration.class);
+        doReturn(listenerRegistrationMock).when(domNotificationServiceMock).registerNotificationListener(any(NetconfEventSource.class), any(List.class));
+    }
+
+    @Test (expected=NullPointerException.class)
+    public void onNotificationTest() {
+        DOMNotification domNotificationMock = mock(DOMNotification.class);
+        ContainerNode containerNodeMock = mock(ContainerNode.class);
+        SchemaContext schemaContextMock = mock(SchemaContext.class);
+        SchemaPath schemaPathMock = mock(SchemaPath.class);
+        doReturn(schemaContextMock).when(domMountPointMock).getSchemaContext();
+        doReturn(schemaPathMock).when(domNotificationMock).getType();
+        doReturn(containerNodeMock).when(domNotificationMock).getBody();
+        netconfEventSource.onNotification(domNotificationMock);
+    }
+
+    @Test
+    public void onDataChangedTest() {
+        InstanceIdentifier brmIdent = InstanceIdentifier.create(Nodes.class)
+                .child(Node.class, new NodeKey(new NodeId("brm"))).augmentation(NetconfNode.class);
+        AsyncDataChangeEvent asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class);
+        NetconfNode dataObjectMock = mock(NetconfNode.class);
+        Map<InstanceIdentifier, DataObject> dataChangeMap = new HashMap<>();
+        dataChangeMap.put(brmIdent, dataObjectMock);
+        doReturn(dataChangeMap).when(asyncDataChangeEventMock).getOriginalData();
+        doReturn(dataChangeMap).when(asyncDataChangeEventMock).getUpdatedData();
+
+        netconfEventSource.onDataChanged(asyncDataChangeEventMock);
+        verify(dataObjectMock, times(2)).isConnected();
+    }
+
+    @Test
+    public void onDataChangedResubscribeTest() throws Exception{
+        InstanceIdentifier brmIdent = InstanceIdentifier.create(Nodes.class)
+                .child(Node.class, new NodeKey(new NodeId("brm"))).augmentation(NetconfNode.class);
+        AsyncDataChangeEvent asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class);
+        NetconfNode dataObjectMock = mock(NetconfNode.class);
+        Map<InstanceIdentifier, DataObject> dataChangeMap = new HashMap<>();
+        dataChangeMap.put(brmIdent, dataObjectMock);
+        doReturn(dataChangeMap).when(asyncDataChangeEventMock).getUpdatedData();
+        doReturn(true).when(dataObjectMock).isConnected();
+
+        Set<String> localSet = getActiveStreams();
+        localSet.add("activeStream1");
+
+        Optional<DOMService> optionalMock = (Optional<DOMService>) mock(Optional.class);
+        doReturn(optionalMock).when(domMountPointMock).getService(DOMRpcService.class);
+        DOMRpcService domRpcServiceMock = mock(DOMRpcService.class);
+        doReturn(domRpcServiceMock).when(optionalMock).get();
+        CheckedFuture checkedFutureMock = mock(CheckedFuture.class);
+        doReturn(checkedFutureMock).when(domRpcServiceMock).invokeRpc(any(SchemaPath.class), any(ContainerNode.class));
+
+        netconfEventSource.onDataChanged(asyncDataChangeEventMock);
+        verify(dataObjectMock, times(1)).isConnected();
+        assertEquals("Size of set has not been set correctly.", 1, getActiveStreams().size());
+    }
+
+    private Set getActiveStreams() throws Exception{
+        Field nesField = NetconfEventSource.class.getDeclaredField("activeStreams");
+        nesField.setAccessible(true);
+        return (Set) nesField.get(netconfEventSource);
+    }
+
+}
diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/TopicDOMNotificationTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/TopicDOMNotificationTest.java
new file mode 100644 (file)
index 0000000..4872127
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.app.impl;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicNotification;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+
+public class TopicDOMNotificationTest {
+
+    ContainerNode containerNodeBodyMock;
+    TopicDOMNotification topicDOMNotification;
+
+    @BeforeClass
+    public static void initTestClass() throws IllegalAccessException, InstantiationException {
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        containerNodeBodyMock = mock(ContainerNode.class);
+        topicDOMNotification = new TopicDOMNotification(containerNodeBodyMock);
+    }
+
+    @Test
+    public void constructorTest() {
+        assertNotNull("Instance has not been created correctly.", topicDOMNotification);
+    }
+
+    @Test
+    public void getTypeTest() {
+        SchemaPath TOPIC_NOTIFICATION_ID = SchemaPath.create(true, TopicNotification.QNAME);
+        assertEquals("Type has not been created correctly.", TOPIC_NOTIFICATION_ID, topicDOMNotification.getType());
+    }
+
+    @Test
+    public void getBodyTest() {
+        assertEquals("String has not been created correctly.", containerNodeBodyMock, topicDOMNotification.getBody());
+    }
+
+    @Test
+    public void getToStringTest() {
+        String bodyString = "TopicDOMNotification [body=" + containerNodeBodyMock + "]";
+        assertEquals("String has not been created correctly.", bodyString, topicDOMNotification.toString());
+    }
+}
diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/UtilTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/UtilTest.java
new file mode 100644 (file)
index 0000000..2ff4654
--- /dev/null
@@ -0,0 +1,143 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.app.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.junit.Test;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+public class UtilTest {
+
+    @Test
+    public void testMD5Hash() throws Exception {
+        // empty string
+        createAndAssertHash("", "d41d8cd98f00b204e9800998ecf8427e");
+
+        // non-empty string
+        createAndAssertHash("The Guardian", "69b929ae473ed732d5fb8e0a55a8dc8d");
+
+        // the same hash for the same string
+        createAndAssertHash("The Independent", "db793706d70c37dcc16454fa8eb21b1c");
+        createAndAssertHash("The Independent", "db793706d70c37dcc16454fa8eb21b1c"); // one more time
+
+        // different strings must have different hashes
+        createAndAssertHash("orange", "fe01d67a002dfa0f3ac084298142eccd");
+        createAndAssertHash("yellow", "d487dd0b55dfcacdd920ccbdaeafa351");
+    }
+
+    //TODO: IllegalArgumentException would be better
+    @Test(expected = RuntimeException.class)
+    public void testMD5HashInvalidInput() throws Exception {
+        Util.md5String(null);
+    }
+
+    @Test
+    public void testWildcardToRegex() throws Exception {
+        // empty wildcard string
+        createAndAssertRegex("", "^$");
+
+        // wildcard string is a char to be replaced
+        createAndAssertRegex("*", "^.*$");
+        createAndAssertRegex("?", "^.$");
+        final String relevantChars = "()[]$^.{}|\\";
+        for (final char c : relevantChars.toCharArray()) {
+            final char oneChar[] = {c};
+            final String wildcardStr = new String(oneChar);
+            final String expectedRegex = "^\\" + c + "$";
+            createAndAssertRegex(wildcardStr, expectedRegex);
+        }
+
+        // wildcard string consists of more chars
+        createAndAssertRegex("a", "^a$");
+        createAndAssertRegex("aBc", "^aBc$");
+        createAndAssertRegex("a1b2C34", "^a1b2C34$");
+        createAndAssertRegex("*?()[]$^.{}|\\X", "^.*.\\(\\)\\[\\]\\$\\^\\.\\{\\}\\|\\\\X$");
+        createAndAssertRegex("a*BB?37|42$", "^a.*BB.37\\|42\\$$");
+    }
+
+    @Test
+    public void testResultFor() throws Exception {
+        {
+            final String expectedResult = "dummy string";
+            RpcResult<String> rpcResult = Util.resultFor(expectedResult).get();
+            assertEquals(expectedResult, rpcResult.getResult());
+            assertTrue(rpcResult.isSuccessful());
+            assertTrue(rpcResult.getErrors().isEmpty());
+        }
+        {
+            final Integer expectedResult = 42;
+            RpcResult<Integer> rpcResult = Util.resultFor(expectedResult).get();
+            assertEquals(expectedResult, rpcResult.getResult());
+            assertTrue(rpcResult.isSuccessful());
+            assertTrue(rpcResult.getErrors().isEmpty());
+        }
+    }
+
+    @Test
+    public void testExpandQname() throws Exception {
+        // match no path because the list of the allowed paths is empty
+        {
+            final List<SchemaPath> paths = new ArrayList<>();
+            final Pattern regexPattern = Pattern.compile(".*"); // match everything
+            final List<SchemaPath> matchingPaths = Util.expandQname(paths, regexPattern);
+            assertTrue(matchingPaths.isEmpty());
+        }
+
+        // match no path because of regex pattern
+        {
+            final List<SchemaPath> paths = createSchemaPathList();
+            final Pattern regexPattern = Pattern.compile("^@.*");
+            final List<SchemaPath> matchingPaths = Util.expandQname(paths, regexPattern);
+            assertTrue(matchingPaths.isEmpty());
+        }
+
+        // match all paths
+        {
+            final List<SchemaPath> paths = createSchemaPathList();
+            final Pattern regexPattern = Pattern.compile(".*");
+            final List<SchemaPath> matchingPaths = Util.expandQname(paths, regexPattern);
+            assertTrue(matchingPaths.contains(paths.get(0)));
+            assertTrue(matchingPaths.contains(paths.get(1)));
+            assertEquals(paths.size(), matchingPaths.size());
+        }
+
+        // match one path only
+        {
+            final List<SchemaPath> paths = createSchemaPathList();
+            final Pattern regexPattern = Pattern.compile(".*yyy$");
+            final List<SchemaPath> matchingPaths = Util.expandQname(paths, regexPattern);
+            assertTrue(matchingPaths.contains(paths.get(1)));
+            assertEquals(1, matchingPaths.size());
+        }
+    }
+
+    private static void createAndAssertHash(final String inString, final String expectedHash) {
+        assertEquals("Incorrect hash.", expectedHash, Util.md5String(inString));
+    }
+
+    private static void createAndAssertRegex(final String wildcardStr, final String expectedRegex) {
+        assertEquals("Incorrect regex string.", expectedRegex, Util.wildcardToRegex(wildcardStr));
+    }
+
+    private static List<SchemaPath> createSchemaPathList() {
+        final QName qname1 = QName.create("urn:odl:xxx", "2015-01-01", "localName");
+        final QName qname2 = QName.create("urn:odl:yyy", "2015-01-01", "localName");
+        final SchemaPath path1 = SchemaPath.create(true, qname1);
+        final SchemaPath path2 = SchemaPath.create(true, qname2);
+        return Arrays.asList(path1, path2);
+    }
+}
index eca5213..bf30a16 100644 (file)
@@ -84,6 +84,7 @@
     <!-- Message Bus -->
     <module>messagebus-api</module>
     <module>messagebus-impl</module>
+    <module>messagebus-config</module>
   </modules>
 
   <build>
                     </goals>
                   </pluginExecutionFilter>
                   <action>
-                    <execute></execute>
+                    <execute/>
                   </action>
                 </pluginExecution>
               </pluginExecutions>
       </modules>
     </profile>
   </profiles>
-</project>
+</project>
\ No newline at end of file

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.