Merge "BUG-2310 - widen yang model for netconf-node."
authorTony Tkacik <ttkacik@cisco.com>
Tue, 10 Feb 2015 21:13:18 +0000 (21:13 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 10 Feb 2015 21:13:18 +0000 (21:13 +0000)
83 files changed:
features/mdsal/src/main/resources/features.xml
features/netconf/src/main/resources/features.xml
itests/base-features-it/pom.xml
karaf/karaf-parent/pom.xml
karaf/opendaylight-karaf-empty/pom.xml
karaf/opendaylight-karaf/pom.xml
opendaylight/commons/opendaylight/pom.xml
opendaylight/config/config-plugin-parent/pom.xml
opendaylight/config/logback-config-loader/pom.xml
opendaylight/config/logback-config/pom.xml
opendaylight/config/netty-config-api/pom.xml
opendaylight/config/netty-event-executor-config/pom.xml
opendaylight/config/netty-threadgroup-config/pom.xml
opendaylight/config/netty-timer-config/pom.xml
opendaylight/config/pom.xml
opendaylight/config/threadpool-config-api/pom.xml
opendaylight/config/threadpool-config-impl/pom.xml
opendaylight/config/yang-jmx-generator-plugin/src/test/java/org/opendaylight/controller/config/yangjmxgenerator/plugin/ModuleMXBeanEntryPluginTest.java
opendaylight/config/yang-jmx-generator/src/test/java/org/opendaylight/controller/config/yangjmxgenerator/ModuleMXBeanEntryTest.java
opendaylight/config/yang-test-plugin/pom.xml
opendaylight/config/yang-test/pom.xml
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CommitEntry.java [deleted file]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/PersistEntry.java [deleted file]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SaveSnapshot.java [deleted file]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestVote.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestVoteReply.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-binding-it/src/main/java/org/opendaylight/controller/test/sal/binding/it/TestHelper.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/InvalidNormalizedNodeStreamException.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeInputStreamReader.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeOutputStreamWriter.java
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeStreamReaderWriterTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/SerializationUtils.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-dom-broker/pom.xml
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMNotificationRouter.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMNotificationRouterEvent.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/PingPongTransactionChain.java
opendaylight/md-sal/sal-dummy-distributed-datastore/pom.xml
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpc.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRpc.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RpcResponse.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java
opendaylight/netconf/ietf-netconf-notifications/pom.xml [new file with mode: 0644]
opendaylight/netconf/ietf-netconf-notifications/src/main/yang/ietf-netconf-notifications@2012-02-06.yang [new file with mode: 0644]
opendaylight/netconf/ietf-netconf-notifications/src/main/yang/nc-notifications@2008-07-14.yang [new file with mode: 0644]
opendaylight/netconf/ietf-netconf-notifications/src/main/yang/notifications@2008-07-14.yang [new file with mode: 0644]
opendaylight/netconf/netconf-artifacts/pom.xml
opendaylight/netconf/netconf-mapping-api/src/main/java/org/opendaylight/controller/netconf/mapping/api/SessionAwareNetconfOperation.java [new file with mode: 0644]
opendaylight/netconf/netconf-notifications-api/pom.xml [new file with mode: 0644]
opendaylight/netconf/netconf-notifications-api/src/main/java/org/opendaylight/controller/netconf/notifications/BaseNetconfNotificationListener.java [new file with mode: 0644]
opendaylight/netconf/netconf-notifications-api/src/main/java/org/opendaylight/controller/netconf/notifications/BaseNotificationPublisherRegistration.java [new file with mode: 0644]
opendaylight/netconf/netconf-notifications-api/src/main/java/org/opendaylight/controller/netconf/notifications/NetconfNotification.java [new file with mode: 0644]
opendaylight/netconf/netconf-notifications-api/src/main/java/org/opendaylight/controller/netconf/notifications/NetconfNotificationCollector.java [new file with mode: 0644]
opendaylight/netconf/netconf-notifications-api/src/main/java/org/opendaylight/controller/netconf/notifications/NetconfNotificationListener.java [new file with mode: 0644]
opendaylight/netconf/netconf-notifications-api/src/main/java/org/opendaylight/controller/netconf/notifications/NetconfNotificationRegistry.java [new file with mode: 0644]
opendaylight/netconf/netconf-notifications-api/src/main/java/org/opendaylight/controller/netconf/notifications/NotificationListenerRegistration.java [new file with mode: 0644]
opendaylight/netconf/netconf-notifications-api/src/main/java/org/opendaylight/controller/netconf/notifications/NotificationPublisherRegistration.java [new file with mode: 0644]
opendaylight/netconf/netconf-notifications-api/src/main/java/org/opendaylight/controller/netconf/notifications/NotificationRegistration.java [new file with mode: 0644]
opendaylight/netconf/netconf-notifications-impl/pom.xml [new file with mode: 0644]
opendaylight/netconf/netconf-notifications-impl/src/main/java/org/opendaylight/controller/netconf/notifications/impl/NetconfNotificationManager.java [new file with mode: 0644]
opendaylight/netconf/netconf-notifications-impl/src/main/java/org/opendaylight/controller/netconf/notifications/impl/ops/CreateSubscription.java [new file with mode: 0644]
opendaylight/netconf/netconf-notifications-impl/src/main/java/org/opendaylight/controller/netconf/notifications/impl/ops/Get.java [new file with mode: 0644]
opendaylight/netconf/netconf-notifications-impl/src/main/java/org/opendaylight/controller/netconf/notifications/impl/ops/NotificationsTransformUtil.java [new file with mode: 0644]
opendaylight/netconf/netconf-notifications-impl/src/main/java/org/opendaylight/controller/netconf/notifications/impl/osgi/Activator.java [new file with mode: 0644]
opendaylight/netconf/netconf-notifications-impl/src/test/java/org/opendaylight/controller/netconf/notifications/impl/NetconfNotificationManagerTest.java [new file with mode: 0644]
opendaylight/netconf/netconf-notifications-impl/src/test/java/org/opendaylight/controller/netconf/notifications/impl/ops/CreateSubscriptionTest.java [new file with mode: 0644]
opendaylight/netconf/netconf-notifications-impl/src/test/java/org/opendaylight/controller/netconf/notifications/impl/ops/GetTest.java [new file with mode: 0644]
opendaylight/netconf/netconf-notifications-impl/src/test/java/org/opendaylight/controller/netconf/notifications/impl/ops/NotificationsTransformUtilTest.java [new file with mode: 0644]
opendaylight/netconf/pom.xml
pom.xml

index 1582f457897d8c2eabf20e833de3dcb36e946742..8c166e63821fe2197211c72b8f195db8813f9202 100644 (file)
@@ -28,6 +28,7 @@
         <feature version='${mdsal.version}'>odl-mdsal-common</feature>
         <feature version='${config.version}'>odl-config-startup</feature>
         <feature version='${config.version}'>odl-config-netty</feature>
+        <bundle>mvn:com.lmax/disruptor/${lmax.version}</bundle>
         <bundle>mvn:org.opendaylight.controller/sal-core-api/${project.version}</bundle>
         <bundle>mvn:org.opendaylight.controller/sal-core-spi/${project.version}</bundle>
         <bundle>mvn:org.opendaylight.controller/sal-broker-impl/${project.version}</bundle>
index 9de15630c3941e63f01580a252b2faa4bf1627da..2affa27d19844a7fee6599fb2f605a19450b3f0b 100644 (file)
@@ -22,6 +22,8 @@
     <bundle>mvn:org.opendaylight.controller/netconf-api/${project.version}</bundle>
     <bundle>mvn:org.opendaylight.controller/netconf-auth/${project.version}</bundle>
     <bundle>mvn:org.opendaylight.controller/ietf-netconf-monitoring/${project.version}</bundle>
+    <bundle>mvn:org.opendaylight.controller/ietf-netconf/${project.version}</bundle>
+    <bundle>mvn:org.opendaylight.controller/ietf-netconf-notifications/${project.version}</bundle>
     <bundle>mvn:org.opendaylight.controller/ietf-netconf-monitoring-extension/${project.version}</bundle>
     <bundle>mvn:org.opendaylight.yangtools.model/ietf-inet-types/${ietf-inet-types.version}</bundle>
     <bundle>mvn:org.opendaylight.yangtools.model/ietf-yang-types/${ietf-yang-types.version}</bundle>
@@ -43,6 +45,7 @@
     <feature version='${project.version}'>odl-config-netconf-connector</feature>
     <!-- Netconf will not provide schemas without monitoring -->
     <feature version='${project.version}'>odl-netconf-monitoring</feature>
+    <feature version='${project.version}'>odl-netconf-notifications-impl</feature>
     <bundle>mvn:org.opendaylight.controller/netconf-impl/${project.version}</bundle>
   </feature>
   <feature name='odl-config-netconf-connector' version='${project.version}' description="OpenDaylight :: Netconf :: Connector">
@@ -50,6 +53,7 @@
     <feature version='${project.version}'>odl-netconf-api</feature>
     <feature version='${project.version}'>odl-netconf-mapping-api</feature>
     <feature version='${project.version}'>odl-netconf-util</feature>
+    <feature version='${project.version}'>odl-netconf-notifications-api</feature>
     <bundle>mvn:org.opendaylight.controller/config-netconf-connector/${project.version}</bundle>
   </feature>
   <feature name='odl-netconf-netty-util' version='${project.version}' description="OpenDaylight :: Netconf :: Netty Util">
     <feature version='${project.version}'>odl-netconf-util</feature>
     <bundle>mvn:org.opendaylight.controller/netconf-monitoring/${project.version}</bundle>
   </feature>
+  <feature name='odl-netconf-notifications-api' version='${project.version}' description="OpenDaylight :: Netconf :: Notification :: Api">
+    <feature version='${project.version}'>odl-netconf-api</feature>
+    <bundle>mvn:org.opendaylight.controller/netconf-notifications-api/${project.version}</bundle>
+  </feature>
+  <feature name='odl-netconf-notifications-impl' version='${project.version}' description="OpenDaylight :: Netconf :: Monitoring :: Impl">
+    <feature version='${project.version}'>odl-netconf-notifications-api</feature>
+    <bundle>mvn:org.opendaylight.controller/netconf-notifications-impl/${project.version}</bundle>
+  </feature>
 
 </features>
index d05e9a515bed4c84480d83900b3b7cde42805c37..dfb622eb7c1561ae37521ec9ea9fa9017917efad 100644 (file)
@@ -25,7 +25,7 @@
         <dependency>
             <groupId>org.ops4j.pax.exam</groupId>
             <artifactId>pax-exam-container-karaf</artifactId>
-            <version>${pax.exam.version}</version>
+            <version>${exam.version}</version>
             <scope>test</scope>
         </dependency>
         <dependency>
@@ -36,7 +36,7 @@
         <dependency>
             <groupId>org.ops4j.pax.exam</groupId>
             <artifactId>pax-exam</artifactId>
-            <version>${pax.exam.version}</version>
+            <version>${exam.version}</version>
             <scope>test</scope>
         </dependency>
         <dependency>
index 06d8c8d99b7d9d222002f70207c17b7ea9c7ae1e..baf67302e0d49f68affda9850d15cd6910177970 100644 (file)
@@ -19,9 +19,7 @@ and is available at http://www.eclipse.org/legal/epl-v10.html
   <artifactId>karaf-parent</artifactId>
   <name>${project.artifactId}</name>
   <packaging>pom</packaging>
-  <prerequisites>
-    <maven>3.1.1</maven>
-  </prerequisites>
+
   <properties>
     <branding.version>1.1.0-SNAPSHOT</branding.version>
     <karaf.resources.version>1.5.0-SNAPSHOT</karaf.resources.version>
@@ -320,6 +318,26 @@ and is available at http://www.eclipse.org/legal/epl-v10.html
       </plugins>
     </pluginManagement>
     <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-enforcer-plugin</artifactId>
+        <version>${enforcer.version}</version>
+        <executions>
+          <execution>
+            <id>enforce-maven</id>
+            <goals>
+              <goal>enforce</goal>
+            </goals>
+            <configuration>
+              <rules>
+                <requireMavenVersion>
+                  <version>3.1.1</version>
+                </requireMavenVersion>
+              </rules>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
       <plugin>
         <artifactId>maven-resources-plugin</artifactId>
       </plugin>
index a13023cbeebdc74a47a7741933202075deef4b1a..aa772096cd958307341bbff1e652224adec0aa6f 100644 (file)
@@ -9,9 +9,6 @@
   </parent>
   <artifactId>opendaylight-karaf-empty</artifactId>
   <packaging>pom</packaging>
-  <prerequisites>
-    <maven>3.0</maven>
-  </prerequisites>
 
   <dependencies>
     <dependency>
index 9bf2309b6f034e9ea3ac37a7b06fe8d2a14b9f28..3b29fa276ec4c9b353603781946adcacd921a812 100644 (file)
@@ -9,9 +9,6 @@
   </parent>
   <artifactId>distribution.opendaylight-karaf</artifactId>
   <packaging>pom</packaging>
-  <prerequisites>
-    <maven>3.0</maven>
-  </prerequisites>
 
   <dependencies>
     <dependency>
index 5310db30a74a74e7923409b4bd227190119fcf49..6a9b4bef2aa3b74476c3384535e2eb6219d5a3cb 100644 (file)
     <northbound.jolokia.version>1.5.0-SNAPSHOT</northbound.jolokia.version>
     <opendaylight-l2-types.version>2013.08.27.7-SNAPSHOT</opendaylight-l2-types.version>
     <osgi-brandfragment.web.version>0.1.0-SNAPSHOT</osgi-brandfragment.web.version>
-    <pax.exam.version>4.0.0</pax.exam.version>
     <parboiled.version>1.1.6</parboiled.version>
     <parboiled.scala.version>1.1.6</parboiled.scala.version>
     <propertymavenplugin.version>1.0-alpha-2</propertymavenplugin.version>
     <yangtools.version>0.7.0-SNAPSHOT</yangtools.version>
     <sshd-core.version>0.12.0</sshd-core.version>
     <jmh.version>0.9.7</jmh.version>
+    <lmax.version>3.3.0</lmax.version>
   </properties>
 
   <dependencyManagement>
         <artifactId>guava</artifactId>
         <version>${guava.version}</version>
       </dependency>
+      <dependency>
+        <groupId>com.lmax</groupId>
+        <artifactId>disruptor</artifactId>
+        <version>${lmax.version}</version>
+      </dependency>
+
       <!-- 3rd party dependencies needed by config-->
       <dependency>
         <groupId>com.jcabi</groupId>
index 67370e1e2f345b9a6b20fc72625234a202fd7b50..081df0c52de3e3b5b5fd44ad221c69516e703072 100644 (file)
@@ -11,9 +11,6 @@
   <artifactId>config-plugin-parent</artifactId>
   <packaging>pom</packaging>
   <name>${project.artifactId}</name>
-  <prerequisites>
-    <maven>3.0.4</maven>
-  </prerequisites>
 
   <build>
     <pluginManagement>
index 0f379fbe21808b351a3f37fd20ea6d578474f9bb..94f7f8fc4932f0a7f7f6ac543b7cf607cf44455f 100644 (file)
@@ -11,9 +11,6 @@
   <artifactId>logback-config-loader</artifactId>
   <packaging>bundle</packaging>
   <name>${project.artifactId}</name>
-  <prerequisites>
-    <maven>3.0.4</maven>
-  </prerequisites>
 
   <dependencies>
     <dependency>
index d918fd7ab7ad383796002509d506f8d3e353820a..d4537387aad1021913459ca14f6139d537f69dbf 100644 (file)
@@ -11,9 +11,6 @@
   <artifactId>logback-config</artifactId>
   <packaging>bundle</packaging>
   <name>${project.artifactId}</name>
-  <prerequisites>
-    <maven>3.0.4</maven>
-  </prerequisites>
 
   <dependencies>
     <dependency>
index a5c0831fb82aa16ca7bdb8175a709b7690f8c110..2d8145723d1b51f5be061b9ce384d84bdf563b01 100644 (file)
@@ -11,9 +11,6 @@
   <artifactId>netty-config-api</artifactId>
   <packaging>bundle</packaging>
   <name>${project.artifactId}</name>
-  <prerequisites>
-    <maven>3.0.4</maven>
-  </prerequisites>
 
   <dependencies>
     <dependency>
index 6188aed898be9ade5430e01635b01291c6779072..31940b91fed2a5f59947a7a845950076e0ae6d0b 100644 (file)
@@ -12,9 +12,6 @@
   <packaging>bundle</packaging>
   <name>${project.artifactId}</name>
   <description>Configuration Wrapper around netty's event executor</description>
-  <prerequisites>
-    <maven>3.0.4</maven>
-  </prerequisites>
 
   <dependencies>
     <dependency>
index 2f3d26dd2fe3a7379b1fb60422689229a3444c61..0f645015e1c453fc2e0527c962d62645f6ea6651 100644 (file)
@@ -14,9 +14,6 @@
   <packaging>bundle</packaging>
   <name>${project.artifactId}</name>
   <description>Configuration Wrapper around netty's event group</description>
-  <prerequisites>
-    <maven>3.0.4</maven>
-  </prerequisites>
 
   <dependencies>
     <dependency>
index 75b4709da26349014ef4f83923bbcb909a2c8fb5..181c1d01513537f7ece1d9a5c868d47abfdfdc29 100644 (file)
@@ -12,9 +12,6 @@
   <packaging>bundle</packaging>
   <name>${project.artifactId}</name>
   <description>Configuration Wrapper around netty's timer</description>
-  <prerequisites>
-    <maven>3.0.4</maven>
-  </prerequisites>
 
   <dependencies>
     <dependency>
index 4c4c5b3378f1bfb87f64544248e052ca43053cfa..6d14ad3957c6e03a51bd728b054ba6388d245075 100644 (file)
@@ -13,9 +13,7 @@
   <version>0.3.0-SNAPSHOT</version>
   <packaging>pom</packaging>
   <name>${project.artifactId}</name>
-  <prerequisites>
-    <maven>3.0.4</maven>
-  </prerequisites>
+
   <modules>
     <module>config-api</module>
     <module>config-manager</module>
index 5f0c941a19a7ed44ba89d792a9eb69ea0c1c68e9..9dc7bf59767d901556b0fc1e3a86d3b3282707c4 100644 (file)
@@ -11,9 +11,6 @@
   <artifactId>threadpool-config-api</artifactId>
   <packaging>bundle</packaging>
   <name>${project.artifactId}</name>
-  <prerequisites>
-    <maven>3.0.4</maven>
-  </prerequisites>
 
   <dependencies>
     <dependency>
index 2787b30df442d98ea0d2ae91aea4ab6eaada34fe..b875f5f3e4198f5d1dfe3b4ebe06c2cbca7d83ad 100644 (file)
@@ -11,9 +11,6 @@
   <artifactId>threadpool-config-impl</artifactId>
   <packaging>bundle</packaging>
   <name>${project.artifactId}</name>
-  <prerequisites>
-    <maven>3.0.4</maven>
-  </prerequisites>
 
   <dependencies>
     <dependency>
index 1c44a80e0b65a3ef25fd276277f2986d6fa9e447..d9f88643deb2acf762f039b752936111dd92fe01 100644 (file)
@@ -90,8 +90,8 @@ public class ModuleMXBeanEntryPluginTest extends ModuleMXBeanEntryTest {
             assertThat(runtimeBeans.size(), is(4));
 
             {
-                RuntimeBeanEntry streamRB = findFirstByYangName(runtimeBeans,
-                        "stream");
+                RuntimeBeanEntry streamRB = findFirstByNamePrefix(runtimeBeans,
+                        "ThreadStream");
                 assertNotNull(streamRB);
                 assertFalse(streamRB.getKeyYangName().isPresent());
                 assertFalse(streamRB.getKeyJavaName().isPresent());
index e116f480c52a9aad07a23231fff2bca8e1a2f002..50f38e3978a790528205fb58452c4956a646e659 100644 (file)
@@ -140,6 +140,17 @@ public class ModuleMXBeanEntryTest extends AbstractYangTest {
                 + " in " + runtimeBeans);
     }
 
+    protected RuntimeBeanEntry findFirstByNamePrefix(final Collection<RuntimeBeanEntry> runtimeBeans, final String namePrefix) {
+        for (RuntimeBeanEntry rb : runtimeBeans) {
+            if (namePrefix.equals(rb.getJavaNamePrefix())) {
+                return rb;
+            }
+        }
+
+        throw new IllegalArgumentException("Name prefix not found:" + namePrefix
+            + " in " + runtimeBeans);
+    }
+
     @Test
     public void testGetWhenConditionMatcher() {
         assertMatches("config",
@@ -247,8 +258,8 @@ public class ModuleMXBeanEntryTest extends AbstractYangTest {
                 assertThat(threadRB.getRpcs().size(), is(2));
             }
             {
-                RuntimeBeanEntry streamRB = findFirstByYangName(runtimeBeans,
-                        "stream");
+                RuntimeBeanEntry streamRB = findFirstByNamePrefix(runtimeBeans,
+                        "ThreadStream");
                 assertNotNull(streamRB);
                 assertFalse(streamRB.getKeyYangName().isPresent());
                 assertFalse(streamRB.getKeyJavaName().isPresent());
index d03cff305b68bc39cddf9f75f7996fd98d3fc5b8..690f8d24e6746671c4032360885670f3ff2e2467 100644 (file)
@@ -12,9 +12,6 @@
   <name>${project.artifactId}</name>
 
   <description>Remove generated source files, after new files generation, implementation is inserted.</description>
-  <prerequisites>
-    <maven>3.0.4</maven>
-  </prerequisites>
 
   <dependencies>
     <dependency>
index 5977325574202c3edd39125dc6ca60f4ebd5fd71..f5c966d5a6e9472d8c98211b087a9131171b8d23 100644 (file)
@@ -14,9 +14,6 @@
   <name>${project.artifactId}</name>
 
   <description>Artifact that contains only generated code from yang files. Suitable for testing.</description>
-  <prerequisites>
-    <maven>3.0.4</maven>
-  </prerequisites>
 
   <dependencies>
     <dependency>
index 6d0c14e733a8c81bb2e29a663915eb5776422a6d..73c81afd187d2791703c1428beacbc8348abda69 100644 (file)
@@ -73,4 +73,12 @@ public interface FollowerLogInformation {
      * This will stop the timeout clock
      */
     void markFollowerInActive();
+
+
+    /**
+     * This will return the active time of follower, since it was last reset
+     * @return time in milliseconds
+     */
+    long timeSinceLastActivity();
+
 }
index 7a690d3d18be84433f9e37c874a88b277b83f7cb..0fed63098d6da1edfb6229245ffd63d1e4e4b7df 100644 (file)
@@ -95,4 +95,9 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
             stopwatch.stop();
         }
     }
+
+    @Override
+    public long timeSinceLastActivity() {
+        return stopwatch.elapsed(TimeUnit.MILLISECONDS);
+    }
 }
index 77bf10370184894d6a510990e08fb1a8bc785a84..feccea7edba37b5faff9782cf986146f0a784af9 100644 (file)
@@ -12,7 +12,8 @@ import java.util.List;
 
 
 public class Snapshot implements Serializable {
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = -8298574936724056236L;
+
     private final byte[] state;
     private final List<ReplicatedLogEntry> unAppliedEntries;
     private final long lastIndex;
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CommitEntry.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CommitEntry.java
deleted file mode 100644 (file)
index 6335e32..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.raft.base.messages;
-
-import java.io.Serializable;
-
-/**
- * Message sent to commit an entry to the log
- */
-public class CommitEntry implements Serializable {
-    private static final long serialVersionUID = 1L;
-}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/PersistEntry.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/PersistEntry.java
deleted file mode 100644 (file)
index 68ecc12..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.raft.base.messages;
-
-import java.io.Serializable;
-
-/**
- * Message sent to Persist an entry into the transaction journal
- */
-public class PersistEntry implements Serializable {
-    private static final long serialVersionUID = 1L;
-}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SaveSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SaveSnapshot.java
deleted file mode 100644 (file)
index 7b7f085..0000000
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.raft.base.messages;
-
-import java.io.Serializable;
-
-/**
- * This message is sent by a RaftActor to itself so that a subclass can process
- * it and use it to save it's state
- */
-public class SaveSnapshot implements Serializable {
-    private static final long serialVersionUID = 1L;
-}
index e28e4b066d372ee54594ecaf9fe0e5259ad4cdfd..8f33d94700bc4a87231c44ab66758bd7c28bf049 100644 (file)
@@ -232,6 +232,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             purgeInMemoryLog();
         }
 
+        //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event
+        sendUpdatesToFollower(followerId, followerLogInformation, false);
         return this;
     }
 
@@ -294,6 +296,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             // set currentTerm = T, convert to follower (§5.1)
             // This applies to all RPC messages and responses
             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+                LOG.debug("{}: Term {} in \"{}\" message is greater than leader's term {}", context.getId(),
+                        rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
+
                 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
 
                 return switchBehavior(new Follower(context));
@@ -330,12 +335,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
         String followerId = reply.getFollowerId();
         FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+
+        if (followerToSnapshot == null) {
+            LOG.error("{}: FollowerId {} in InstallSnapshotReply not known to Leader",
+                    context.getId(), followerId);
+            return;
+        }
+
         FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
         followerLogInformation.markFollowerActive();
 
-        if (followerToSnapshot != null &&
-            followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
-
+        if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
+            boolean wasLastChunk = false;
             if (reply.isSuccess()) {
                 if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
                     //this was the last chunk reply
@@ -363,6 +374,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                         // we can remove snapshot from the memory
                         setSnapshot(Optional.<ByteString>absent());
                     }
+                    wasLastChunk = true;
 
                 } else {
                     followerToSnapshot.markSendStatus(true);
@@ -374,11 +386,17 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 followerToSnapshot.markSendStatus(false);
             }
 
+            if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) {
+                ActorSelection followerActor = context.getPeerActorSelection(followerId);
+                if(followerActor != null) {
+                    sendSnapshotChunk(followerActor, followerId);
+                }
+            }
+
         } else {
-            LOG.error("{}: FollowerId in InstallSnapshotReply not known to Leader" +
-                    " or Chunk Index in InstallSnapshotReply not matching {} != {}",
-                    context.getId(), followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
-            );
+            LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}",
+                    context.getId(), reply.getChunkIndex(), followerId,
+                    followerToSnapshot.getChunkIndex());
 
             if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){
                 // Since the Follower did not find this index to be valid we should reset the follower snapshot
@@ -413,75 +431,94 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     private void sendAppendEntries() {
         // Send an AppendEntries to all followers
+        long heartbeatInterval = context.getConfigParams().getHeartBeatInterval().toMillis();
         for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
             final String followerId = e.getKey();
-            ActorSelection followerActor = context.getPeerActorSelection(followerId);
+            final FollowerLogInformation followerLogInformation = e.getValue();
+            // This checks helps not to send a repeat message to the follower
+            if(followerLogInformation.timeSinceLastActivity() >= heartbeatInterval) {
+                sendUpdatesToFollower(followerId, followerLogInformation, true);
+            }
+        }
+    }
 
-            if (followerActor != null) {
-                FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
-                long followerNextIndex = followerLogInformation.getNextIndex();
-                boolean isFollowerActive = followerLogInformation.isFollowerActive();
-
-                if (mapFollowerToSnapshot.get(followerId) != null) {
-                    // if install snapshot is in process , then sent next chunk if possible
-                    if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
-                        sendSnapshotChunk(followerActor, followerId);
-                    } else {
-                        // we send a heartbeat even if we have not received a reply for the last chunk
-                        sendAppendEntriesToFollower(followerActor, followerNextIndex,
-                            Collections.<ReplicatedLogEntry>emptyList());
-                    }
+    /**
+     *
+     * This method checks if any update needs to be sent to the given follower. This includes append log entries,
+     * sending next snapshot chunk, and initiating a snapshot.
+     * @return true if any update is sent, false otherwise
+     */
 
-                } else {
-                    long leaderLastIndex = context.getReplicatedLog().lastIndex();
-                    long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
-                    final List<ReplicatedLogEntry> entries;
-
-                    if (isFollowerActive &&
-                        context.getReplicatedLog().isPresent(followerNextIndex)) {
-                        // FIXME : Sending one entry at a time
-                        entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
-
-                    } else if (isFollowerActive && followerNextIndex >= 0 &&
-                        leaderLastIndex >= followerNextIndex ) {
-                        // if the followers next index is not present in the leaders log, and
-                        // if the follower is just not starting and if leader's index is more than followers index
-                        // then snapshot should be sent
-
-                        if(LOG.isDebugEnabled()) {
-                            LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s," +
-                                    "follower-nextIndex: %s, leader-snapshot-index: %s,  " +
-                                    "leader-last-index: %s", context.getId(), followerId,
-                                followerNextIndex, leaderSnapShotIndex, leaderLastIndex));
-                        }
-                        actor().tell(new InitiateInstallSnapshot(), actor());
-
-                        // we would want to sent AE as the capture snapshot might take time
-                        entries =  Collections.<ReplicatedLogEntry>emptyList();
-
-                    } else {
-                        //we send an AppendEntries, even if the follower is inactive
-                        // in-order to update the followers timestamp, in case it becomes active again
-                        entries =  Collections.<ReplicatedLogEntry>emptyList();
+    private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation,
+                                          boolean sendHeartbeat) {
+
+        ActorSelection followerActor = context.getPeerActorSelection(followerId);
+        if (followerActor != null) {
+            long followerNextIndex = followerLogInformation.getNextIndex();
+            boolean isFollowerActive = followerLogInformation.isFollowerActive();
+
+            if (mapFollowerToSnapshot.get(followerId) != null) {
+                // if install snapshot is in process , then sent next chunk if possible
+                if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
+                    sendSnapshotChunk(followerActor, followerId);
+                } else if(sendHeartbeat) {
+                    // we send a heartbeat even if we have not received a reply for the last chunk
+                    sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
+                        Collections.<ReplicatedLogEntry>emptyList(), followerId);
+                }
+            } else {
+                long leaderLastIndex = context.getReplicatedLog().lastIndex();
+                long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
+                if (isFollowerActive &&
+                    context.getReplicatedLog().isPresent(followerNextIndex)) {
+                    // FIXME : Sending one entry at a time
+                    final List<ReplicatedLogEntry> entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
+
+                    sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId);
+
+                } else if (isFollowerActive && followerNextIndex >= 0 &&
+                    leaderLastIndex >= followerNextIndex) {
+                    // if the followers next index is not present in the leaders log, and
+                    // if the follower is just not starting and if leader's index is more than followers index
+                    // then snapshot should be sent
+
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("InitiateInstallSnapshot to follower:{}," +
+                                "follower-nextIndex:{}, leader-snapshot-index:{},  " +
+                                "leader-last-index:{}", followerId,
+                            followerNextIndex, leaderSnapShotIndex, leaderLastIndex
+                        );
                     }
+                    actor().tell(new InitiateInstallSnapshot(), actor());
 
-                    sendAppendEntriesToFollower(followerActor, followerNextIndex, entries);
+                    // Send heartbeat to follower whenever install snapshot is initiated.
+                    sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
+                            Collections.<ReplicatedLogEntry>emptyList(), followerId);
 
+                } else if(sendHeartbeat) {
+                    //we send an AppendEntries, even if the follower is inactive
+                    // in-order to update the followers timestamp, in case it becomes active again
+                    sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
+                        Collections.<ReplicatedLogEntry>emptyList(), followerId);
                 }
+
             }
         }
     }
 
     private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex,
-        List<ReplicatedLogEntry> entries) {
-        followerActor.tell(
-            new AppendEntries(currentTerm(), context.getId(),
-                prevLogIndex(followerNextIndex),
-                prevLogTerm(followerNextIndex), entries,
-                context.getCommitIndex(),
-                replicatedToAllIndex).toSerializable(),
-            actor()
-        );
+        List<ReplicatedLogEntry> entries, String followerId) {
+        AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
+            prevLogIndex(followerNextIndex),
+            prevLogTerm(followerNextIndex), entries,
+            context.getCommitIndex(), replicatedToAllIndex);
+
+        if(!entries.isEmpty()) {
+            LOG.debug("{}: Sending AppendEntries to follower {}: {}", context.getId(), followerId,
+                    appendEntries);
+        }
+
+        followerActor.tell(appendEntries.toSerializable(), actor());
     }
 
     /**
@@ -501,6 +538,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      *
      */
     private void installSnapshotIfNeeded() {
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("{}: installSnapshotIfNeeded, followers {}", context.getId(), followerToLog.keySet());
+        }
+
         for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
             final ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
 
@@ -508,14 +549,14 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 long nextIndex = e.getValue().getNextIndex();
 
                 if (!context.getReplicatedLog().isPresent(nextIndex) &&
-                    context.getReplicatedLog().isInSnapshot(nextIndex)) {
+                        context.getReplicatedLog().isInSnapshot(nextIndex)) {
                     LOG.info("{}: {} follower needs a snapshot install", context.getId(), e.getKey());
                     if (snapshot.isPresent()) {
                         // if a snapshot is present in the memory, most likely another install is in progress
                         // no need to capture snapshot
                         sendSnapshotChunk(followerActor, e.getKey());
 
-                    } else {
+                    } else if (!context.isSnapshotCaptureInitiated()) {
                         initiateCaptureSnapshot();
                         //we just need 1 follower who would need snapshot to be installed.
                         // when we have the snapshot captured, we would again check (in SendInstallSnapshot)
@@ -548,6 +589,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(),
                 lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated),
             actor());
+        context.setSnapshotCaptureInitiated(true);
     }
 
 
@@ -573,21 +615,27 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
         try {
             if (snapshot.isPresent()) {
+                ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId,snapshot.get());
+
+                // Note: the previous call to getNextSnapshotChunk has the side-effect of adding
+                // followerId to the followerToSnapshot map.
+                FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+
                 followerActor.tell(
                     new InstallSnapshot(currentTerm(), context.getId(),
                         context.getReplicatedLog().getSnapshotIndex(),
                         context.getReplicatedLog().getSnapshotTerm(),
-                        getNextSnapshotChunk(followerId,snapshot.get()),
-                        mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
-                        mapFollowerToSnapshot.get(followerId).getTotalChunks(),
-                        Optional.of(mapFollowerToSnapshot.get(followerId).getLastChunkHashCode())
+                        nextSnapshotChunk,
+                            followerToSnapshot.incrementChunkIndex(),
+                            followerToSnapshot.getTotalChunks(),
+                        Optional.of(followerToSnapshot.getLastChunkHashCode())
                     ).toSerializable(),
                     actor()
                 );
                 LOG.info("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
                         context.getId(), followerActor.path(),
-                        mapFollowerToSnapshot.get(followerId).getChunkIndex(),
-                        mapFollowerToSnapshot.get(followerId).getTotalChunks());
+                        followerToSnapshot.getChunkIndex(),
+                        followerToSnapshot.getTotalChunks());
             }
         } catch (IOException e) {
             LOG.error(e, "{}: InstallSnapshot failed for Leader.", context.getId());
index a782eda5659d9389606ea4e7606a5f61dd949c07..32ed85b281fee0feb4b0533c0b72cb5ade40093a 100644 (file)
@@ -12,7 +12,7 @@ package org.opendaylight.controller.cluster.raft.messages;
  * Reply for the AppendEntriesRpc message
  */
 public class AppendEntriesReply extends AbstractRaftRPC {
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = -7487547356392536683L;
 
     // true if follower contained entry matching
     // prevLogIndex and prevLogTerm
@@ -38,6 +38,7 @@ public class AppendEntriesReply extends AbstractRaftRPC {
         this.logLastTerm = logLastTerm;
     }
 
+    @Override
     public long getTerm() {
         return term;
     }
index 71e7ecc189fc882eaa0917b19af99376ed05f1b7..15621bf894b14f75ec845821654202877f4b4382 100644 (file)
@@ -9,13 +9,13 @@
 package org.opendaylight.controller.cluster.raft.messages;
 
 public class InstallSnapshotReply extends AbstractRaftRPC {
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 642227896390779503L;
 
     // The followerId - this will be used to figure out which follower is
     // responding
     private final String followerId;
     private final int chunkIndex;
-    private boolean success;
+    private final boolean success;
 
     public InstallSnapshotReply(long term, String followerId, int chunkIndex,
         boolean success) {
index 8321d0c25bcecce32617c55fe652c2b7be189541..9ba5acb664700f4873d51fdb9f2637ef882c360e 100644 (file)
@@ -12,7 +12,7 @@ package org.opendaylight.controller.cluster.raft.messages;
  * Invoked by candidates to gather votes (§5.2).
  */
 public class RequestVote extends AbstractRaftRPC {
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = -6967509186297108657L;
 
     // candidate requesting vote
     private String candidateId;
@@ -35,6 +35,7 @@ public class RequestVote extends AbstractRaftRPC {
     public RequestVote() {
     }
 
+    @Override
     public long getTerm() {
         return term;
     }
index da3ba5c39f6989b86c1f76b7830d2c0426302ddb..b3c95d6ecaab2b19524fb4ae06c3c8f6491658c0 100644 (file)
@@ -9,7 +9,7 @@
 package org.opendaylight.controller.cluster.raft.messages;
 
 public class RequestVoteReply extends AbstractRaftRPC {
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 8427899326488775660L;
 
     // true means candidate received vot
     private final boolean voteGranted;
@@ -19,6 +19,7 @@ public class RequestVoteReply extends AbstractRaftRPC {
         this.voteGranted = voteGranted;
     }
 
+    @Override
     public long getTerm() {
         return term;
     }
index 30893810f5a9bee6542fff36f81694380844a06c..cf7af439e5c5cb362975bdee0008b8941e74dd45 100644 (file)
@@ -46,6 +46,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
@@ -1119,6 +1120,88 @@ public class RaftActorTest extends AbstractActorTest {
         };
     }
 
+    @Test
+    public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                String persistenceId = "leader1";
+
+                ActorRef followerActor1 =
+                        getSystem().actorOf(Props.create(MessageCollectorActor.class));
+                ActorRef followerActor2 =
+                        getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+                config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+                DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
+
+                Map<String, String> peerAddresses = new HashMap<>();
+                peerAddresses.put("follower-1", followerActor1.path().toString());
+                peerAddresses.put("follower-2", followerActor2.path().toString());
+
+                TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
+                        MockRaftActor.props(persistenceId, peerAddresses,
+                                Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+
+                MockRaftActor leaderActor = mockActorRef.underlyingActor();
+                leaderActor.getRaftActorContext().setCommitIndex(9);
+                leaderActor.getRaftActorContext().setLastApplied(9);
+                leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
+
+                leaderActor.waitForInitializeBehaviorComplete();
+
+                Leader leader = new Leader(leaderActor.getRaftActorContext());
+                leaderActor.setCurrentBehavior(leader);
+                assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+                // create 5 entries in the log
+                MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
+                leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
+                //set the snapshot index to 4 , 0 to 4 are snapshotted
+                leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
+                assertEquals(5, leaderActor.getReplicatedLog().size());
+
+                leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 9, 1));
+                assertEquals(5, leaderActor.getReplicatedLog().size());
+
+                // set the 2nd follower nextIndex to 1 which has been snapshotted
+                leaderActor.onReceiveCommand(new AppendEntriesReply("follower-2", 1, true, 0, 1));
+                assertEquals(5, leaderActor.getReplicatedLog().size());
+
+                // simulate a real snapshot
+                leaderActor.onReceiveCommand(new InitiateInstallSnapshot());
+                assertEquals(5, leaderActor.getReplicatedLog().size());
+                assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+                //reply from a slow follower does not initiate a fake snapshot
+                leaderActor.onReceiveCommand(new AppendEntriesReply("follower-2", 1, true, 9, 1));
+                assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
+
+                ByteString snapshotBytes  = fromObject(Arrays.asList(
+                        new MockRaftActorContext.MockPayload("foo-0"),
+                        new MockRaftActorContext.MockPayload("foo-1"),
+                        new MockRaftActorContext.MockPayload("foo-2"),
+                        new MockRaftActorContext.MockPayload("foo-3"),
+                        new MockRaftActorContext.MockPayload("foo-4")));
+                leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
+                assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+
+                assertEquals("Real snapshot didn't clear the log till lastApplied", 0, leaderActor.getReplicatedLog().size());
+
+                //reply from a slow follower after should not raise errors
+                leaderActor.onReceiveCommand(new AppendEntriesReply("follower-2", 1, true, 5, 1));
+                assertEquals(0, leaderActor.getReplicatedLog().size());
+
+                mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+            }
+        };
+    }
+
+
+
     private ByteString fromObject(Object snapshot) throws Exception {
         ByteArrayOutputStream b = null;
         ObjectOutputStream o = null;
index b31cb621b3576b1a9bcbaff321465d4bd186674e..666cea69ec891754687b262db72f26d9012fbb0c 100644 (file)
@@ -1,8 +1,5 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
@@ -46,6 +43,10 @@ import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
 import scala.concurrent.duration.FiniteDuration;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
 public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
     private final ActorRef leaderActor =
@@ -86,6 +87,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     actorContext.setPeerAddresses(peerAddresses);
 
                     Leader leader = new Leader(actorContext);
+                    leader.markFollowerActive(followerActor.path().toString());
+                    Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                        TimeUnit.MILLISECONDS);
                     leader.handleMessage(senderActor, new SendHeartBeat());
 
                     final String out =
@@ -133,6 +137,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     actorContext.setPeerAddresses(peerAddresses);
 
                     Leader leader = new Leader(actorContext);
+                    leader.markFollowerActive(followerActor.path().toString());
+                    Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                        TimeUnit.MILLISECONDS);
                     RaftActorBehavior raftBehavior = leader
                         .handleMessage(senderActor, new Replicate(null, null,
                             new MockRaftActorContext.MockReplicatedLogEntry(1,
@@ -270,6 +277,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             leader.getFollowerToSnapshot().getNextChunk();
             leader.getFollowerToSnapshot().incrementChunkIndex();
 
+            Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+
             leader.handleMessage(leaderActor, new SendHeartBeat());
 
             AppendEntries aeproto = (AppendEntries)MessageCollectorActor.getFirstMatching(
@@ -344,6 +354,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             //update follower timestamp
             leader.markFollowerActive(followerActor.path().toString());
 
+            Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+
             // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
             RaftActorBehavior raftBehavior = leader.handleMessage(
                 senderActor, new Replicate(null, "state-id", entry));
@@ -432,6 +445,12 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             assertEquals(1, cs.getLastAppliedTerm());
             assertEquals(4, cs.getLastIndex());
             assertEquals(2, cs.getLastTerm());
+
+            // if an initiate is started again when first is in progress, it shouldnt initiate Capture
+            raftBehavior = leader.handleMessage(leaderActor, new InitiateInstallSnapshot());
+            List<Object> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
+            assertEquals("CaptureSnapshot should not get invoked when  initiate is in progress", 1, captureSnapshots.size());
+
         }};
     }
 
@@ -576,9 +595,102 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             assertEquals(snapshotIndex + 1, fli.getNextIndex());
         }};
     }
+    @Test
+    public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            TestActorRef<MessageCollectorActor> followerActor =
+                TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-reply");
+
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put("follower-reply",
+                followerActor.path().toString());
+
+            final int followersLastIndex = 2;
+            final int snapshotIndex = 3;
+            final int snapshotTerm = 1;
+            final int currentTerm = 2;
+
+            MockRaftActorContext actorContext =
+                (MockRaftActorContext) createActorContext();
+            DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
+                @Override
+                public int getSnapshotChunkSize() {
+                    return 50;
+                }
+            };
+            configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+            configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+
+            actorContext.setConfigParams(configParams);
+            actorContext.setPeerAddresses(peerAddresses);
+            actorContext.setCommitIndex(followersLastIndex);
+
+            MockLeader leader = new MockLeader(actorContext);
+
+            Map<String, String> leadersSnapshot = new HashMap<>();
+            leadersSnapshot.put("1", "A");
+            leadersSnapshot.put("2", "B");
+            leadersSnapshot.put("3", "C");
+
+            // set the snapshot variables in replicatedlog
+            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+            actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+            ByteString bs = toByteString(leadersSnapshot);
+            leader.setSnapshot(Optional.of(bs));
+
+            leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+
+            List<Object> objectList = MessageCollectorActor.getAllMatching(followerActor,
+                InstallSnapshotMessages.InstallSnapshot.class);
+
+            assertEquals(1, objectList.size());
+
+            Object o = objectList.get(0);
+            assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+            InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+            assertEquals(1, installSnapshot.getChunkIndex());
+            assertEquals(3, installSnapshot.getTotalChunks());
+
+            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+                "follower-reply", installSnapshot.getChunkIndex(), true));
+
+            objectList = MessageCollectorActor.getAllMatching(followerActor,
+                InstallSnapshotMessages.InstallSnapshot.class);
+
+            assertEquals(2, objectList.size());
+
+            installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(1);
+
+            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+                "follower-reply", installSnapshot.getChunkIndex(), true));
+
+            objectList = MessageCollectorActor.getAllMatching(followerActor,
+                InstallSnapshotMessages.InstallSnapshot.class);
+
+            assertEquals(3, objectList.size());
+
+            installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(2);
+
+            // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
+            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+                "follower-reply", installSnapshot.getChunkIndex(), true));
+
+            objectList = MessageCollectorActor.getAllMatching(followerActor,
+                InstallSnapshotMessages.InstallSnapshot.class);
+
+            // Count should still stay at 3
+            assertEquals(3, objectList.size());
+        }};
+    }
+
 
     @Test
-    public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
+    public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
         new JavaTestKit(getSystem()) {{
 
             TestActorRef<MessageCollectorActor> followerActor =
@@ -632,11 +744,15 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             assertEquals(3, installSnapshot.getTotalChunks());
 
 
-            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), followerActor.path().toString(), -1, false));
+            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+                followerActor.path().toString(), -1, false));
+
+            Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
 
             leader.handleMessage(leaderActor, new SendHeartBeat());
 
-            o = MessageCollectorActor.getAllMessages(followerActor).get(1);
+            o = MessageCollectorActor.getAllMatching(followerActor,InstallSnapshotMessages.InstallSnapshot.class).get(1);
 
             assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
 
@@ -655,7 +771,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             {
 
                 TestActorRef<MessageCollectorActor> followerActor =
-                        TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
+                        TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk");
 
                 Map<String, String> peerAddresses = new HashMap<>();
                 peerAddresses.put(followerActor.path().toString(),
@@ -709,10 +825,10 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
                 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
 
-                leader.handleMessage(leaderActor, new SendHeartBeat());
-
                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 
+                leader.handleMessage(leaderActor, new SendHeartBeat());
+
                 o = MessageCollectorActor.getAllMessages(followerActor).get(1);
 
                 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
@@ -874,6 +990,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             Leader leader = new Leader(leaderActorContext);
             leader.markFollowerActive(followerActor.path().toString());
 
+            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+
             leader.handleMessage(leaderActor, new SendHeartBeat());
 
             AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
@@ -942,6 +1061,8 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             Leader leader = new Leader(leaderActorContext);
             leader.markFollowerActive(followerActor.path().toString());
 
+            Thread.sleep(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis());
+
             leader.handleMessage(leaderActor, new SendHeartBeat());
 
             AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
@@ -1170,6 +1291,85 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         }};
     }
 
+
+    @Test
+    public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+            MockRaftActorContext leaderActorContext =
+                new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+            DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+            configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+            configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+
+            leaderActorContext.setConfigParams(configParams);
+
+            ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
+
+            MockRaftActorContext followerActorContext =
+                new MockRaftActorContext("follower-reply", getSystem(), followerActor);
+
+            followerActorContext.setConfigParams(configParams);
+
+            Follower follower = new Follower(followerActorContext);
+
+            ForwardMessageToBehaviorActor.setBehavior(follower);
+
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put("follower-reply",
+                followerActor.path().toString());
+
+            leaderActorContext.setPeerAddresses(peerAddresses);
+
+            leaderActorContext.getReplicatedLog().removeFrom(0);
+
+            //create 3 entries
+            leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+            leaderActorContext.setCommitIndex(1);
+
+            Leader leader = new Leader(leaderActorContext);
+            leader.markFollowerActive("follower-reply");
+
+            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+
+            leader.handleMessage(leaderActor, new SendHeartBeat());
+
+            AppendEntries appendEntries = (AppendEntries) ForwardMessageToBehaviorActor
+                .getFirstMatching(followerActor, AppendEntries.class);
+
+            assertNotNull(appendEntries);
+
+            assertEquals(1, appendEntries.getLeaderCommit());
+            assertEquals(1, appendEntries.getEntries().get(0).getIndex());
+            assertEquals(0, appendEntries.getPrevLogIndex());
+
+            AppendEntriesReply appendEntriesReply =
+                (AppendEntriesReply)ForwardMessageToBehaviorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
+
+            assertNotNull(appendEntriesReply);
+
+            leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
+
+            List<Object> entries = ForwardMessageToBehaviorActor
+                .getAllMatching(followerActor, AppendEntries.class);
+
+            assertEquals("AppendEntries count should be 2 ", 2, entries.size());
+
+            AppendEntries appendEntriesSecond = (AppendEntries) entries.get(1);
+
+            assertEquals(1, appendEntriesSecond.getLeaderCommit());
+            assertEquals(2, appendEntriesSecond.getEntries().get(0).getIndex());
+            assertEquals(1, appendEntriesSecond.getPrevLogIndex());
+
+        }};
+    }
+
     class MockLeader extends Leader {
 
         FollowerToSnapshot fts;
index 9b6d5836f0bd2ac9cdc5704740f392f14c6cdbf3..a64e3600f5023e90ced07f30f12145eb4ecbf02c 100644 (file)
@@ -10,8 +10,8 @@ package org.opendaylight.controller.test.sal.binding.it;
 import static org.ops4j.pax.exam.CoreOptions.frameworkProperty;
 import static org.ops4j.pax.exam.CoreOptions.junitBundles;
 import static org.ops4j.pax.exam.CoreOptions.mavenBundle;
+import static org.ops4j.pax.exam.CoreOptions.systemPackages;
 import static org.ops4j.pax.exam.CoreOptions.systemProperty;
-
 import org.ops4j.pax.exam.Option;
 import org.ops4j.pax.exam.options.DefaultCompositeOption;
 import org.ops4j.pax.exam.util.PathUtils;
@@ -47,7 +47,7 @@ public class TestHelper {
                 bindingAwareSalBundles(),
                 mavenBundle("commons-codec", "commons-codec").versionAsInProject(),
 
-                systemProperty("org.osgi.framework.system.packages.extra").value("sun.nio.ch"),
+                systemPackages("sun.nio.ch", "sun.misc"),
                 mavenBundle("io.netty", "netty-common").versionAsInProject(), //
                 mavenBundle("io.netty", "netty-buffer").versionAsInProject(), //
                 mavenBundle("io.netty", "netty-handler").versionAsInProject(), //
@@ -123,7 +123,8 @@ public class TestHelper {
                 mavenBundle(CONTROLLER, "sal-common-util").versionAsInProject(), // //
 
 
-                mavenBundle(CONTROLLER, "sal-inmemory-datastore").versionAsInProject(), // /
+                mavenBundle("com.lmax", "disruptor").versionAsInProject(),
+                mavenBundle(CONTROLLER, "sal-inmemory-datastore").versionAsInProject(), //
                 mavenBundle(CONTROLLER, "sal-broker-impl").versionAsInProject(), // //
                 mavenBundle(CONTROLLER, "sal-core-spi").versionAsInProject().update(), //
 
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/InvalidNormalizedNodeStreamException.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/InvalidNormalizedNodeStreamException.java
new file mode 100644 (file)
index 0000000..da60496
--- /dev/null
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.node.utils.stream;
+
+import java.io.IOException;
+
+/**
+ * Exception thrown from NormalizedNodeInputStreamReader when the input stream does not contain
+ * valid serialized data.
+ *
+ * @author Thomas Pantelis
+ */
+public class InvalidNormalizedNodeStreamException extends IOException {
+    private static final long serialVersionUID = 1L;
+
+    public InvalidNormalizedNodeStreamException(String message) {
+        super(message);
+    }
+}
index cde338179ba727903e35d4513195cbbddd9b1b1c..bb2f5d41d920d5ce97c723fe6bc91cafc5cd6031 100644 (file)
@@ -69,6 +69,8 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead
 
     private final StringBuilder reusableStringBuilder = new StringBuilder(50);
 
+    private boolean readSignatureMarker = true;
+
     public NormalizedNodeInputStreamReader(InputStream stream) throws IOException {
         Preconditions.checkNotNull(stream);
         input = new DataInputStream(stream);
@@ -80,6 +82,25 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead
 
     @Override
     public NormalizedNode<?, ?> readNormalizedNode() throws IOException {
+        readSignatureMarkerAndVersionIfNeeded();
+        return readNormalizedNodeInternal();
+    }
+
+    private void readSignatureMarkerAndVersionIfNeeded() throws IOException {
+        if(readSignatureMarker) {
+            readSignatureMarker = false;
+
+            byte marker = input.readByte();
+            if(marker != NormalizedNodeOutputStreamWriter.SIGNATURE_MARKER) {
+                throw new InvalidNormalizedNodeStreamException(String.format(
+                        "Invalid signature marker: %d", marker));
+            }
+
+            input.readShort(); // read the version - not currently used/needed.
+        }
+    }
+
+    private NormalizedNode<?, ?> readNormalizedNodeInternal() throws IOException {
         // each node should start with a byte
         byte nodeType = input.readByte();
 
@@ -284,7 +305,7 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead
                 return bytes;
 
             case ValueTypes.YANG_IDENTIFIER_TYPE :
-                return readYangInstanceIdentifier();
+                return readYangInstanceIdentifierInternal();
 
             default :
                 return null;
@@ -292,6 +313,11 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead
     }
 
     public YangInstanceIdentifier readYangInstanceIdentifier() throws IOException {
+        readSignatureMarkerAndVersionIfNeeded();
+        return readYangInstanceIdentifierInternal();
+    }
+
+    private YangInstanceIdentifier readYangInstanceIdentifierInternal() throws IOException {
         int size = input.readInt();
 
         List<PathArgument> pathArguments = new ArrayList<>(size);
@@ -342,11 +368,11 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead
 
         lastLeafSetQName = nodeType;
 
-        LeafSetEntryNode<Object> child = (LeafSetEntryNode<Object>)readNormalizedNode();
+        LeafSetEntryNode<Object> child = (LeafSetEntryNode<Object>)readNormalizedNodeInternal();
 
         while(child != null) {
             builder.withChild(child);
-            child = (LeafSetEntryNode<Object>)readNormalizedNode();
+            child = (LeafSetEntryNode<Object>)readNormalizedNodeInternal();
         }
         return builder;
     }
@@ -356,11 +382,11 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead
             NormalizedNodeContainerBuilder builder) throws IOException {
         LOG.debug("Reading data container (leaf nodes) nodes");
 
-        NormalizedNode<?, ?> child = readNormalizedNode();
+        NormalizedNode<?, ?> child = readNormalizedNodeInternal();
 
         while(child != null) {
             builder.addChild(child);
-            child = readNormalizedNode();
+            child = readNormalizedNodeInternal();
         }
         return builder;
     }
index 088f4dfbe98a1358a980a84e8ad3adaa99736b57..d4aab036be21df1734f4f88bc590b35030a71d21 100644 (file)
@@ -46,6 +46,9 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri
 
     private static final Logger LOG = LoggerFactory.getLogger(NormalizedNodeOutputStreamWriter.class);
 
+    static final byte SIGNATURE_MARKER = (byte) 0xab;
+    static final short CURRENT_VERSION = (short) 1;
+
     static final byte IS_CODE_VALUE = 1;
     static final byte IS_STRING_VALUE = 2;
     static final byte IS_NULL_VALUE = 3;
@@ -56,6 +59,8 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri
 
     private NormalizedNodeWriter normalizedNodeWriter;
 
+    private boolean wroteSignatureMarker;
+
     public NormalizedNodeOutputStreamWriter(OutputStream stream) throws IOException {
         Preconditions.checkNotNull(stream);
         output = new DataOutputStream(stream);
@@ -74,9 +79,18 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri
     }
 
     public void writeNormalizedNode(NormalizedNode<?, ?> node) throws IOException {
+        writeSignatureMarkerAndVersionIfNeeded();
         normalizedNodeWriter().write(node);
     }
 
+    private void writeSignatureMarkerAndVersionIfNeeded() throws IOException {
+        if(!wroteSignatureMarker) {
+            output.writeByte(SIGNATURE_MARKER);
+            output.writeShort(CURRENT_VERSION);
+            wroteSignatureMarker = true;
+        }
+    }
+
     @Override
     public void leafNode(YangInstanceIdentifier.NodeIdentifier name, Object value) throws IOException, IllegalArgumentException {
         Preconditions.checkNotNull(name, "Node identifier should not be null");
@@ -201,6 +215,9 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri
     private void startNode(final QName qName, byte nodeType) throws IOException {
 
         Preconditions.checkNotNull(qName, "QName of node identifier should not be null.");
+
+        writeSignatureMarkerAndVersionIfNeeded();
+
         // First write the type of node
         output.writeByte(nodeType);
         // Write Start Tag
@@ -247,6 +264,11 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri
     }
 
     public void writeYangInstanceIdentifier(YangInstanceIdentifier identifier) throws IOException {
+        writeSignatureMarkerAndVersionIfNeeded();
+        writeYangInstanceIdentifierInternal(identifier);
+    }
+
+    private void writeYangInstanceIdentifierInternal(YangInstanceIdentifier identifier) throws IOException {
         Iterable<YangInstanceIdentifier.PathArgument> pathArguments = identifier.getPathArguments();
         int size = Iterables.size(pathArguments);
         output.writeInt(size);
@@ -363,7 +385,7 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri
                 output.write(bytes);
                 break;
             case ValueTypes.YANG_IDENTIFIER_TYPE:
-                writeYangInstanceIdentifier((YangInstanceIdentifier) value);
+                writeYangInstanceIdentifierInternal((YangInstanceIdentifier) value);
                 break;
             case ValueTypes.NULL_TYPE :
                 break;
index 6528f2e4d2e6524f2f08acb750aae764c86dbc0b..67a342b440666e3febfd6e5b390953fa3d54bebf 100644 (file)
@@ -15,15 +15,16 @@ import java.io.IOException;
 import org.apache.commons.lang.SerializationUtils;
 import org.junit.Assert;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
 import org.opendaylight.controller.cluster.datastore.util.TestModel;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
-import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafSetEntryNodeBuilder;
@@ -33,9 +34,13 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 public class NormalizedNodeStreamReaderWriterTest {
 
     @Test
-    public void testNormalizedNodeStreamReaderWriter() throws IOException {
+    public void testNormalizedNodeStreaming() throws IOException {
 
-        testNormalizedNodeStreamReaderWriter(createTestContainer());
+        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        NormalizedNodeOutputStreamWriter writer = new NormalizedNodeOutputStreamWriter(byteArrayOutputStream);
+
+        NormalizedNode<?, ?> testContainer = createTestContainer();
+        writer.writeNormalizedNode(testContainer);
 
         QName toaster = QName.create("http://netconfcentral.org/ns/toaster","2009-11-20","toaster");
         QName darknessFactor = QName.create("http://netconfcentral.org/ns/toaster","2009-11-20","darknessFactor");
@@ -43,9 +48,21 @@ public class NormalizedNodeStreamReaderWriterTest {
                 withNodeIdentifier(new NodeIdentifier(toaster)).
                 withChild(ImmutableNodes.leafNode(darknessFactor, "1000")).build();
 
-        testNormalizedNodeStreamReaderWriter(Builders.containerBuilder().
+        ContainerNode toasterContainer = Builders.containerBuilder().
                 withNodeIdentifier(new NodeIdentifier(SchemaContext.NAME)).
-                withChild(toasterNode).build());
+                withChild(toasterNode).build();
+        writer.writeNormalizedNode(toasterContainer);
+
+        NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(
+                new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
+
+        NormalizedNode<?,?> node = reader.readNormalizedNode();
+        Assert.assertEquals(testContainer, node);
+
+        node = reader.readNormalizedNode();
+        Assert.assertEquals(toasterContainer, node);
+
+        writer.close();
     }
 
     private NormalizedNode<?, ?> createTestContainer() {
@@ -76,24 +93,75 @@ public class NormalizedNodeStreamReaderWriterTest {
                 build();
     }
 
-    private void testNormalizedNodeStreamReaderWriter(NormalizedNode<?, ?> input) throws IOException {
+    @Test
+    public void testYangInstanceIdentifierStreaming() throws IOException  {
+        YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.TEST_PATH).
+                node(TestModel.OUTER_LIST_QNAME).nodeWithKey(
+                        TestModel.INNER_LIST_QNAME, TestModel.ID_QNAME, 10).build();
 
-        byte[] byteData = null;
+        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        NormalizedNodeOutputStreamWriter writer =
+                new NormalizedNodeOutputStreamWriter(byteArrayOutputStream);
+        writer.writeYangInstanceIdentifier(path);
+
+        NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(
+                new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
+
+        YangInstanceIdentifier newPath = reader.readYangInstanceIdentifier();
+        Assert.assertEquals(path, newPath);
+
+        writer.close();
+    }
+
+    @Test
+    public void testNormalizedNodeAndYangInstanceIdentifierStreaming() throws IOException {
 
-        try(ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
-            NormalizedNodeStreamWriter writer = new NormalizedNodeOutputStreamWriter(byteArrayOutputStream)) {
+        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        NormalizedNodeOutputStreamWriter writer = new NormalizedNodeOutputStreamWriter(byteArrayOutputStream);
 
-            NormalizedNodeWriter normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(writer);
-            normalizedNodeWriter.write(input);
-            byteData = byteArrayOutputStream.toByteArray();
+        NormalizedNode<?, ?> testContainer = TestModel.createBaseTestContainerBuilder().build();
+        writer.writeNormalizedNode(testContainer);
 
-        }
+        YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.TEST_PATH).
+                node(TestModel.OUTER_LIST_QNAME).nodeWithKey(
+                        TestModel.INNER_LIST_QNAME, TestModel.ID_QNAME, 10).build();
+
+        writer.writeYangInstanceIdentifier(path);
 
         NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(
-                new ByteArrayInputStream(byteData));
+                new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
 
         NormalizedNode<?,?> node = reader.readNormalizedNode();
-        Assert.assertEquals(input, node);
+        Assert.assertEquals(testContainer, node);
+
+        YangInstanceIdentifier newPath = reader.readYangInstanceIdentifier();
+        Assert.assertEquals(path, newPath);
+
+        writer.close();
+    }
+
+    @Test(expected=InvalidNormalizedNodeStreamException.class, timeout=10000)
+    public void testInvalidNormalizedNodeStream() throws IOException {
+        byte[] protobufBytes = new NormalizedNodeToNodeCodec(null).encode(
+                TestModel.createBaseTestContainerBuilder().build()).getNormalizedNode().toByteArray();
+
+        NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(
+                new ByteArrayInputStream(protobufBytes));
+
+        reader.readNormalizedNode();
+    }
+
+    @Test(expected=InvalidNormalizedNodeStreamException.class, timeout=10000)
+    public void testInvalidYangInstanceIdentifierStream() throws IOException {
+        YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.TEST_PATH).build();
+
+        byte[] protobufBytes = ShardTransactionMessages.DeleteData.newBuilder().setInstanceIdentifierPathArguments(
+                InstanceIdentifierUtils.toSerializable(path)).build().toByteArray();
+
+        NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(
+                new ByteArrayInputStream(protobufBytes));
+
+        reader.readYangInstanceIdentifier();
     }
 
     @Test
index 10876045ae272c436e54143c8a0da8bf1c2e41e7..22e2dbd47d4d7148c8e41cfe05ce8532df527466 100644 (file)
@@ -27,6 +27,15 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
@@ -46,15 +55,6 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
 
 /**
  * The ShardManager has the following jobs,
@@ -555,7 +555,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     static class SchemaContextModules implements Serializable {
-        private static final long serialVersionUID = 1L;
+        private static final long serialVersionUID = -8884620101025936590L;
+
         private final Set<String> modules;
 
         SchemaContextModules(Set<String> modules){
index 6f8d0567d9aa162d4080361162fe709decd544ce..2e66ef918e6e7541592449e9b51405610f4826a6 100644 (file)
@@ -96,4 +96,9 @@ public class ShardReadTransaction extends ShardTransaction {
     protected DOMStoreTransaction getDOMStoreTransaction() {
         return transaction;
     }
+
+    @Override
+    protected boolean returnCloseTransactionReply() {
+        return false;
+    }
 }
index 678b7815693382179328a8c13adb567d80d61b13..8a37dfee4d8ccc698a5bb9096d4ecdf9ed228771 100644 (file)
@@ -39,12 +39,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
  * The ShardTransaction Actor delegates all actions to DOMDataReadWriteTransaction
  * </p>
  * <p>
- * Even though the DOMStore and the DOMStoreTransactionChain implement multiple types of transactions
- * the ShardTransaction Actor only works with read-write transactions. This is just to keep the logic simple. At this
- * time there are no known advantages for creating a read-only or write-only transaction which may change over time
- * at which point we can optimize things in the distributed store as well.
- * </p>
- * <p>
  * Handles Messages <br/>
  * ---------------- <br/>
  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.ReadData}
@@ -114,10 +108,14 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         }
     }
 
+    protected boolean returnCloseTransactionReply() {
+        return true;
+    }
+
     private void closeTransaction(boolean sendReply) {
         getDOMStoreTransaction().close();
 
-        if(sendReply) {
+        if(sendReply && returnCloseTransactionReply()) {
             getSender().tell(CloseTransactionReply.INSTANCE.toSerializable(), getSelf());
         }
 
index 5854932a6fa0d999fe368aa61bfdd252821739c6..bf9f8d803ac00f0e8ce2cbe983f81bfb9e9a44d0 100644 (file)
@@ -17,6 +17,7 @@ import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.InvalidNormalizedNodeStreamException;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputStreamReader;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeOutputStreamWriter;
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
@@ -93,15 +94,19 @@ public final class SerializationUtils {
     }
 
     public static NormalizedNode<?, ?> deserializeNormalizedNode(DataInput in) {
-            try {
-                boolean present = in.readBoolean();
-                if(present) {
-                    NormalizedNodeInputStreamReader streamReader = streamReader(in);
-                    return streamReader.readNormalizedNode();
-                }
-            } catch (IOException e) {
-                throw new IllegalArgumentException("Error deserializing NormalizedNode", e);
-            }
+        try {
+            return tryDeserializeNormalizedNode(in);
+        } catch (IOException e) {
+            throw new IllegalArgumentException("Error deserializing NormalizedNode", e);
+        }
+    }
+
+    private static NormalizedNode<?, ?> tryDeserializeNormalizedNode(DataInput in) throws IOException {
+        boolean present = in.readBoolean();
+        if(present) {
+            NormalizedNodeInputStreamReader streamReader = streamReader(in);
+            return streamReader.readNormalizedNode();
+        }
 
         return null;
     }
@@ -109,18 +114,17 @@ public final class SerializationUtils {
     public static NormalizedNode<?, ?> deserializeNormalizedNode(byte [] bytes) {
         NormalizedNode<?, ?> node = null;
         try {
-            node = deserializeNormalizedNode(new DataInputStream(new ByteArrayInputStream(bytes)));
-        } catch(Exception e) {
-        }
-
-        if(node == null) {
-            // Must be from legacy protobuf serialization - try that.
+            node = tryDeserializeNormalizedNode(new DataInputStream(new ByteArrayInputStream(bytes)));
+        } catch(InvalidNormalizedNodeStreamException e) {
+            // Probably from legacy protobuf serialization - try that.
             try {
                 NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(bytes);
                 node =  new NormalizedNodeToNodeCodec(null).decode(serializedNode);
-            } catch (InvalidProtocolBufferException e) {
+            } catch (InvalidProtocolBufferException e2) {
                 throw new IllegalArgumentException("Error deserializing NormalizedNode", e);
             }
+        } catch (IOException e) {
+            throw new IllegalArgumentException("Error deserializing NormalizedNode", e);
         }
 
         return node;
index 69dd706f37cb3bd8c1e7c75d91ec05e2b0fe1f13..851fb0114b3a64c7d211a5c6375c14c41e715e69 100644 (file)
@@ -412,10 +412,10 @@ public class ShardTransactionTest extends AbstractActorTest {
     }
 
     @Test
-    public void testOnReceiveCloseTransaction() throws Exception {
+    public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
         new JavaTestKit(getSystem()) {{
             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
-                    "testCloseTransaction");
+                    "testReadWriteTxOnReceiveCloseTransaction");
 
             watch(transaction);
 
@@ -426,6 +426,35 @@ public class ShardTransactionTest extends AbstractActorTest {
         }};
     }
 
+    @Test
+    public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+                    "testWriteTxOnReceiveCloseTransaction");
+
+            watch(transaction);
+
+            transaction.tell(new CloseTransaction().toSerializable(), getRef());
+
+            expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
+            expectTerminated(duration("3 seconds"), transaction);
+        }};
+    }
+
+    @Test
+    public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
+                    "testReadOnlyTxOnReceiveCloseTransaction");
+
+            watch(transaction);
+
+            transaction.tell(new CloseTransaction().toSerializable(), getRef());
+
+            expectMsgClass(duration("3 seconds"), Terminated.class);
+        }};
+    }
+
     @Test(expected=UnknownMessageException.class)
     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
         final ActorRef shard = createShard();
index a824792cf84d5fb530198ea00bd5e326faed0a49..477ddeabdf0ce6ee076335a001462f486b317fd6 100644 (file)
@@ -15,8 +15,8 @@
       <artifactId>guava</artifactId>
     </dependency>
     <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
+      <groupId>com.lmax</groupId>
+      <artifactId>disruptor</artifactId>
     </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
         <artifactId>ietf-yang-types</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMNotificationRouter.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMNotificationRouter.java
new file mode 100644 (file)
index 0000000..aac425b
--- /dev/null
@@ -0,0 +1,190 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.ImmutableMultimap.Builder;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.InsufficientCapacityException;
+import com.lmax.disruptor.SleepingWaitStrategy;
+import com.lmax.disruptor.WaitStrategy;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
+import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+/**
+ * Joint implementation of {@link DOMNotificationPublishService} and {@link DOMNotificationService}. Provides
+ * routing of notifications from publishers to subscribers.
+ *
+ * Internal implementation works by allocating a two-handler Disruptor. The first handler delivers notifications
+ * to subscribed listeners and the second one notifies whoever may be listening on the returned future. Registration
+ * state tracking is performed by a simple immutable multimap -- when a registration or unregistration occurs we
+ * re-generate the entire map from scratch and set it atomically. While registrations/unregistrations synchronize
+ * on this instance, notifications do not take any locks here.
+ *
+ * The fully-blocking {@link #publish(long, DOMNotification, Collection)} and non-blocking {@link #offerNotification(DOMNotification)}
+ * are realized using the Disruptor's native operations. The bounded-blocking {@link #offerNotification(DOMNotification, long, TimeUnit)}
+ * is realized by arming a background wakeup interrupt.
+ */
+public final class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService, DOMNotificationService {
+    private static final ListenableFuture<Void> NO_LISTENERS = Futures.immediateFuture(null);
+    private static final WaitStrategy DEFAULT_STRATEGY = new SleepingWaitStrategy();
+    private static final EventHandler<DOMNotificationRouterEvent> DISPATCH_NOTIFICATIONS = new EventHandler<DOMNotificationRouterEvent>() {
+        @Override
+        public void onEvent(final DOMNotificationRouterEvent event, final long sequence, final boolean endOfBatch) throws Exception {
+            event.deliverNotification();
+
+        }
+    };
+    private static final EventHandler<DOMNotificationRouterEvent> NOTIFY_FUTURE = new EventHandler<DOMNotificationRouterEvent>() {
+        @Override
+        public void onEvent(final DOMNotificationRouterEvent event, final long sequence, final boolean endOfBatch) {
+            event.setFuture();
+        }
+    };
+
+    private final Disruptor<DOMNotificationRouterEvent> disruptor;
+    private final ExecutorService executor;
+    private volatile Multimap<SchemaPath, ListenerRegistration<? extends DOMNotificationListener>> listeners = ImmutableMultimap.of();
+
+    private DOMNotificationRouter(final ExecutorService executor, final Disruptor<DOMNotificationRouterEvent> disruptor) {
+        this.executor = Preconditions.checkNotNull(executor);
+        this.disruptor = Preconditions.checkNotNull(disruptor);
+    }
+
+    @SuppressWarnings("unchecked")
+    public static DOMNotificationRouter create(final int queueDepth) {
+        final ExecutorService executor = Executors.newCachedThreadPool();
+        final Disruptor<DOMNotificationRouterEvent> disruptor = new Disruptor<>(DOMNotificationRouterEvent.FACTORY, queueDepth, executor, ProducerType.MULTI, DEFAULT_STRATEGY);
+
+        disruptor.after(DISPATCH_NOTIFICATIONS).handleEventsWith(NOTIFY_FUTURE);
+        disruptor.start();
+
+        return new DOMNotificationRouter(executor, disruptor);
+    }
+
+    @Override
+    public synchronized <T extends DOMNotificationListener> ListenerRegistration<T> registerNotificationListener(final T listener, final Collection<SchemaPath> types) {
+        final ListenerRegistration<T> reg = new AbstractListenerRegistration<T>(listener) {
+            @Override
+            protected void removeRegistration() {
+                final ListenerRegistration<T> me = this;
+
+                synchronized (DOMNotificationRouter.this) {
+                    listeners = ImmutableMultimap.copyOf(Multimaps.filterValues(listeners, new Predicate<ListenerRegistration<? extends DOMNotificationListener>>() {
+                        @Override
+                        public boolean apply(final ListenerRegistration<? extends DOMNotificationListener> input) {
+                            return input != me;
+                        }
+                    }));
+                }
+            }
+        };
+
+        if (!types.isEmpty()) {
+            final Builder<SchemaPath, ListenerRegistration<? extends DOMNotificationListener>> b = ImmutableMultimap.builder();
+            b.putAll(listeners);
+
+            for (SchemaPath t : types) {
+                b.put(t, reg);
+            }
+
+            listeners = b.build();
+        }
+
+        return reg;
+    }
+
+    @Override
+    public <T extends DOMNotificationListener> ListenerRegistration<T> registerNotificationListener(final T listener, final SchemaPath... types) {
+        return registerNotificationListener(listener, Arrays.asList(types));
+    }
+
+    private ListenableFuture<Void> publish(final long seq, final DOMNotification notification, final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers) {
+        final DOMNotificationRouterEvent event = disruptor.get(seq);
+        final ListenableFuture<Void> future = event.initialize(notification, subscribers);
+        disruptor.getRingBuffer().publish(seq);
+        return future;
+    }
+
+    @Override
+    public ListenableFuture<? extends Object> putNotification(final DOMNotification notification) throws InterruptedException {
+        final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers = listeners.get(notification.getType());
+        if (subscribers.isEmpty()) {
+            return NO_LISTENERS;
+        }
+
+        final long seq = disruptor.getRingBuffer().next();
+        return publish(seq, notification, subscribers);
+    }
+
+    private ListenableFuture<? extends Object> tryPublish(final DOMNotification notification, final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers) {
+        final long seq;
+        try {
+             seq = disruptor.getRingBuffer().tryNext();
+        } catch (InsufficientCapacityException e) {
+            return DOMNotificationPublishService.REJECTED;
+        }
+
+        return publish(seq, notification, subscribers);
+    }
+
+    @Override
+    public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification) {
+        final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers = listeners.get(notification.getType());
+        if (subscribers.isEmpty()) {
+            return NO_LISTENERS;
+        }
+
+        return tryPublish(notification, subscribers);
+    }
+
+    @Override
+    public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification, final long timeout,
+            final TimeUnit unit) throws InterruptedException {
+        final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers = listeners.get(notification.getType());
+        if (subscribers.isEmpty()) {
+            return NO_LISTENERS;
+        }
+
+        // Attempt to perform a non-blocking publish first
+        final ListenableFuture<? extends Object> noBlock = tryPublish(notification, subscribers);
+        if (!DOMNotificationPublishService.REJECTED.equals(noBlock)) {
+            return noBlock;
+        }
+
+        /*
+         * FIXME: we need a background thread, which will watch out for blocking too long. Here
+         *        we will arm a tasklet for it and synchronize delivery of interrupt properly.
+         */
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public void close() {
+        disruptor.shutdown();
+        executor.shutdown();
+    }
+}
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMNotificationRouterEvent.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMNotificationRouterEvent.java
new file mode 100644 (file)
index 0000000..65c7166
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.lmax.disruptor.EventFactory;
+import java.util.Collection;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+
+/**
+ * A single notification event in the disruptor ringbuffer. These objects are reused,
+ * so they do have mutable state.
+ */
+final class DOMNotificationRouterEvent {
+    public static final EventFactory<DOMNotificationRouterEvent> FACTORY = new EventFactory<DOMNotificationRouterEvent>() {
+        @Override
+        public DOMNotificationRouterEvent newInstance() {
+            return new DOMNotificationRouterEvent();
+        }
+    };
+
+    private Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers;
+    private DOMNotification notification;
+    private SettableFuture<Void> future;
+
+    private DOMNotificationRouterEvent() {
+        // Hidden on purpose, initialized in initialize()
+    }
+
+    ListenableFuture<Void> initialize(final DOMNotification notification, final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers) {
+        this.notification = Preconditions.checkNotNull(notification);
+        this.subscribers = Preconditions.checkNotNull(subscribers);
+        this.future = SettableFuture.create();
+        return this.future;
+    }
+
+    void deliverNotification() {
+        for (ListenerRegistration<? extends DOMNotificationListener> r : subscribers) {
+            final DOMNotificationListener l = r.getInstance();
+            if (l != null) {
+                l.onNotification(notification);
+            }
+        }
+    }
+
+    void setFuture() {
+        future.set(null);
+    }
+
+}
\ No newline at end of file
index c3a56ed454024b3cb316c1e5bec9d7184c897edc..961b6c7b9312948ad862fa5236f7ab66ed834534 100644 (file)
@@ -235,7 +235,7 @@ public final class PingPongTransactionChain implements DOMTransactionChain {
          */
         final boolean success = READY_UPDATER.compareAndSet(this, null, tx);
         Preconditions.checkState(success, "Transaction %s collided on ready state", tx, readyTx);
-        LOG.debug("Transaction {} readied");
+        LOG.debug("Transaction {} readied", tx);
 
         /*
          * We do not see a transaction being in-flight, so we need to take care of dispatching
index d8d1a76a70f4db9c41f3e94255101f07eb4cbd8d..c7ee3a5c0c8a2346dabae2c4045a67839a4ab312 100644 (file)
           <version>1.2.0-SNAPSHOT</version>
       </dependency>
 
+      <dependency>
+          <groupId>org.opendaylight.controller</groupId>
+          <artifactId>sal-distributed-datastore</artifactId>
+      </dependency>
 
     <!-- Test Dependencies -->
     <dependency>
index 4bb5258b4005f21eabc62c2707f2d3edca59b229..5d780be641e2cce6d2b48a2628cd675f9a24c8ee 100644 (file)
@@ -9,29 +9,28 @@ package org.opendaylight.controller.remote.rpc.messages;
 
 
 import com.google.common.base.Preconditions;
-import org.opendaylight.yangtools.yang.common.QName;
-
 import java.io.Serializable;
+import org.opendaylight.yangtools.yang.common.QName;
 
 public class ExecuteRpc implements Serializable {
-  private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1128904894827335676L;
 
-  private final String inputCompositeNode;
-  private final QName rpc;
+    private final String inputCompositeNode;
+    private final QName rpc;
 
-  public ExecuteRpc(final String inputCompositeNode, final QName rpc) {
-    Preconditions.checkNotNull(inputCompositeNode, "Composite Node input string should be present");
-    Preconditions.checkNotNull(rpc, "rpc Qname should not be null");
+    public ExecuteRpc(final String inputCompositeNode, final QName rpc) {
+        Preconditions.checkNotNull(inputCompositeNode, "Composite Node input string should be present");
+        Preconditions.checkNotNull(rpc, "rpc Qname should not be null");
 
-    this.inputCompositeNode = inputCompositeNode;
-    this.rpc = rpc;
-  }
+        this.inputCompositeNode = inputCompositeNode;
+        this.rpc = rpc;
+    }
 
-  public String getInputCompositeNode() {
-    return inputCompositeNode;
-  }
+    public String getInputCompositeNode() {
+        return inputCompositeNode;
+    }
 
-  public QName getRpc() {
-    return rpc;
-  }
+    public QName getRpc() {
+        return rpc;
+    }
 }
index 652569b7baf705ea0efa5c4d35f530edf85942f0..9c40dbfc58a556bc28f337c9ee683b947846c045 100644 (file)
@@ -8,37 +8,36 @@
 package org.opendaylight.controller.remote.rpc.messages;
 
 import com.google.common.base.Preconditions;
+import java.io.Serializable;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
-import java.io.Serializable;
-
 public class InvokeRpc implements Serializable {
-  private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = -2813459607858108953L;
 
-  private final QName rpc;
-  private final YangInstanceIdentifier identifier;
-  private final CompositeNode input;
+    private final QName rpc;
+    private final YangInstanceIdentifier identifier;
+    private final CompositeNode input;
 
-  public InvokeRpc(final QName rpc, final YangInstanceIdentifier identifier, final CompositeNode input) {
-    Preconditions.checkNotNull(rpc, "rpc qname should not be null");
-    Preconditions.checkNotNull(input, "rpc input should not be null");
+    public InvokeRpc(final QName rpc, final YangInstanceIdentifier identifier, final CompositeNode input) {
+        Preconditions.checkNotNull(rpc, "rpc qname should not be null");
+        Preconditions.checkNotNull(input, "rpc input should not be null");
 
-    this.rpc = rpc;
-    this.identifier = identifier;
-    this.input = input;
-  }
+        this.rpc = rpc;
+        this.identifier = identifier;
+        this.input = input;
+    }
 
-  public QName getRpc() {
-    return rpc;
-  }
+    public QName getRpc() {
+        return rpc;
+    }
 
-  public YangInstanceIdentifier getIdentifier() {
-    return identifier;
-  }
+    public YangInstanceIdentifier getIdentifier() {
+        return identifier;
+    }
 
-  public CompositeNode getInput() {
-    return input;
-  }
+    public CompositeNode getInput() {
+        return input;
+    }
 }
index 387cb90112dad6ffbab6fb7cce7bf7c1d92442f1..e6b208cb6fe63d64eca185b9d90d2d19144ed440 100644 (file)
@@ -10,14 +10,15 @@ package org.opendaylight.controller.remote.rpc.messages;
 import java.io.Serializable;
 
 public class RpcResponse implements Serializable {
-  private static final long serialVersionUID = 1L;
-  private final String resultCompositeNode;
+    private static final long serialVersionUID = -4211279498688989245L;
 
-  public RpcResponse(final String resultCompositeNode) {
-    this.resultCompositeNode = resultCompositeNode;
-  }
+    private final String resultCompositeNode;
 
-  public String getResultCompositeNode() {
-    return resultCompositeNode;
-  }
+    public RpcResponse(final String resultCompositeNode) {
+        this.resultCompositeNode = resultCompositeNode;
+    }
+
+    public String getResultCompositeNode() {
+        return resultCompositeNode;
+    }
 }
index 52b1106c873872609e4300abe52d558ee89a6011..f67657f6927801931fae2fd0434481169f2f3de8 100644 (file)
@@ -17,7 +17,7 @@ import org.opendaylight.controller.remote.rpc.registry.gossip.Copier;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
 
 public class RoutingTable implements Copier<RoutingTable>, Serializable {
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 5592610415175278760L;
 
     private final Map<RpcRouter.RouteIdentifier<?, ?, ?>, Long> table = new HashMap<>();
     private ActorRef router;
index b81175e9a253176870ddf92901d8d2debaa4d698..4c4573d909491f0414d6bd00ebac5eb35a2c7b94 100644 (file)
@@ -10,7 +10,7 @@ package org.opendaylight.controller.remote.rpc.registry.gossip;
 import java.io.Serializable;
 
 public class BucketImpl<T extends Copier<T>> implements Bucket<T>, Serializable {
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 294779770032719196L;
 
     private Long version = System.currentTimeMillis();
 
index b05bd7d0f6f314c990e4a1d7909115b6c1ad1348..00437e7e5639fc382e8b1434d4b02495ef1ac56e 100644 (file)
@@ -46,7 +46,8 @@ public class Messages {
         }
 
         public static class ContainsBuckets implements Serializable{
-            private static final long serialVersionUID = 1L;
+            private static final long serialVersionUID = -4940160367495308286L;
+
             private final Map<Address, Bucket> buckets;
 
             public ContainsBuckets(Map<Address, Bucket> buckets){
@@ -87,7 +88,8 @@ public class Messages {
         }
 
         public static class ContainsBucketVersions implements Serializable{
-            private static final long serialVersionUID = 1L;
+            private static final long serialVersionUID = -8172148925383801613L;
+
             Map<Address, Long> versions;
 
             public ContainsBucketVersions(Map<Address, Long> versions) {
@@ -119,15 +121,16 @@ public class Messages {
 
     public static class GossiperMessages{
         public static class Tick implements Serializable {
-            private static final long serialVersionUID = 1L;
+            private static final long serialVersionUID = -4770935099506366773L;
         }
 
         public static final class GossipTick extends Tick {
-            private static final long serialVersionUID = 1L;
+            private static final long serialVersionUID = 5803354404380026143L;
         }
 
         public static final class GossipStatus extends ContainsBucketVersions implements Serializable{
-            private static final long serialVersionUID = 1L;
+            private static final long serialVersionUID = -593037395143883265L;
+
             private final Address from;
 
             public GossipStatus(Address from, Map<Address, Long> versions) {
@@ -141,7 +144,8 @@ public class Messages {
         }
 
         public static final class GossipEnvelope extends ContainsBuckets implements Serializable {
-            private static final long serialVersionUID = 1L;
+            private static final long serialVersionUID = 8346634072582438818L;
+
             private final Address from;
             private final Address to;
 
diff --git a/opendaylight/netconf/ietf-netconf-notifications/pom.xml b/opendaylight/netconf/ietf-netconf-notifications/pom.xml
new file mode 100644 (file)
index 0000000..1ce3b03
--- /dev/null
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+  ~
+  ~ This program and the accompanying materials are made available under the
+  ~ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+  ~ and is available at http://www.eclipse.org/legal/epl-v10.html
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.opendaylight.controller</groupId>
+    <artifactId>netconf-subsystem</artifactId>
+    <version>0.3.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>ietf-netconf-notifications</artifactId>
+  <packaging>bundle</packaging>
+  <name>${project.artifactId}</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>ietf-netconf</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.yangtools.model</groupId>
+      <artifactId>ietf-yang-types</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+
+      <plugin>
+        <groupId>org.apache.felix</groupId>
+        <artifactId>maven-bundle-plugin</artifactId>
+        <configuration>
+          <instructions>
+            <Export-Package>
+              org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.*,
+              org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.*,
+              org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.*
+            </Export-Package>
+          </instructions>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.opendaylight.yangtools</groupId>
+        <artifactId>yang-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
diff --git a/opendaylight/netconf/ietf-netconf-notifications/src/main/yang/ietf-netconf-notifications@2012-02-06.yang b/opendaylight/netconf/ietf-netconf-notifications/src/main/yang/ietf-netconf-notifications@2012-02-06.yang
new file mode 100644 (file)
index 0000000..a04799d
--- /dev/null
@@ -0,0 +1,363 @@
+module ietf-netconf-notifications {
+
+   namespace
+     "urn:ietf:params:xml:ns:yang:ietf-netconf-notifications";
+
+   prefix ncn;
+
+   import ietf-inet-types { prefix inet; }
+   import ietf-netconf { prefix nc; }
+
+   organization
+     "IETF NETCONF (Network Configuration Protocol) Working Group";
+
+   contact
+     "WG Web:   <http://tools.ietf.org/wg/netconf/>
+      WG List:  <mailto:netconf@ietf.org>
+
+      WG Chair: Bert Wijnen
+                <mailto:bertietf@bwijnen.net>
+
+      WG Chair: Mehmet Ersue
+                <mailto:mehmet.ersue@nsn.com>
+
+      Editor:   Andy Bierman
+                <mailto:andy@netconfcentral.org>";
+
+   description
+     "This module defines a YANG data model for use with the
+      NETCONF protocol that allows the NETCONF client to
+      receive common NETCONF base event notifications.
+
+      Copyright (c) 2012 IETF Trust and the persons identified as
+      the document authors.  All rights reserved.
+
+      Redistribution and use in source and binary forms, with or
+      without modification, is permitted pursuant to, and subject
+      to the license terms contained in, the Simplified BSD License
+
+
+
+      set forth in Section 4.c of the IETF Trust's Legal Provisions
+      Relating to IETF Documents
+      (http://trustee.ietf.org/license-info).
+
+      This version of this YANG module is part of RFC 6470; see
+      the RFC itself for full legal notices.";
+
+   revision "2012-02-06" {
+     description
+       "Initial version. Errata 3957 added.";
+     reference
+       "RFC 6470: NETCONF Base Notifications";
+   }
+
+  grouping common-session-parms {
+    description
+      "Common session parameters to identify a
+       management session.";
+
+    leaf username {
+      type string;
+      mandatory true;
+      description
+        "Name of the user for the session.";
+    }
+
+    leaf session-id {
+      type nc:session-id-or-zero-type;
+      mandatory true;
+      description
+        "Identifier of the session.
+         A NETCONF session MUST be identified by a non-zero value.
+         A non-NETCONF session MAY be identified by the value zero.";
+    }
+
+    leaf source-host {
+      type inet:ip-address;
+      description
+        "Address of the remote host for the session.";
+    }
+  }
+
+
+
+
+
+
+
+
+   grouping changed-by-parms {
+    description
+      "Common parameters to identify the source
+       of a change event, such as a configuration
+       or capability change.";
+
+    container changed-by {
+      description
+        "Indicates the source of the change.
+         If caused by internal action, then the
+         empty leaf 'server' will be present.
+         If caused by a management session, then
+         the name, remote host address, and session ID
+         of the session that made the change will be reported.";
+      choice server-or-user {
+        mandatory true;
+        leaf server {
+          type empty;
+          description
+            "If present, the change was caused
+             by the server.";
+        }
+
+        case by-user {
+          uses common-session-parms;
+        }
+      } // choice server-or-user
+    } // container changed-by-parms
+  }
+
+
+  notification netconf-config-change {
+    description
+      "Generated when the NETCONF server detects that the
+       <running> or <startup> configuration datastore
+       has been changed by a management session.
+       The notification summarizes the edits that
+       have been detected.
+
+       The server MAY choose to also generate this
+       notification while loading a datastore during the
+       boot process for the device.";
+
+    uses changed-by-parms;
+
+
+
+
+
+    leaf datastore {
+      type enumeration {
+        enum running {
+          description "The <running> datastore has changed.";
+        }
+        enum startup {
+          description "The <startup> datastore has changed";
+        }
+      }
+      default "running";
+      description
+        "Indicates which configuration datastore has changed.";
+    }
+
+    list edit {
+      description
+        "An edit record SHOULD be present for each distinct
+         edit operation that the server has detected on
+         the target datastore.  This list MAY be omitted
+         if the detailed edit operations are not known.
+         The server MAY report entries in this list for
+         changes not made by a NETCONF session (e.g., CLI).";
+
+      leaf target {
+        type instance-identifier;
+        description
+          "Topmost node associated with the configuration change.
+           A server SHOULD set this object to the node within
+           the datastore that is being altered.  A server MAY
+           set this object to one of the ancestors of the actual
+           node that was changed, or omit this object, if the
+           exact node is not known.";
+      }
+
+      leaf operation {
+        type nc:edit-operation-type;
+        description
+          "Type of edit operation performed.
+           A server MUST set this object to the NETCONF edit
+           operation performed on the target datastore.";
+      }
+    } // list edit
+  } // notification netconf-config-change
+
+
+
+
+
+
+  notification netconf-capability-change {
+    description
+      "Generated when the NETCONF server detects that
+       the server capabilities have changed.
+       Indicates which capabilities have been added, deleted,
+       and/or modified.  The manner in which a server
+       capability is changed is outside the scope of this
+       document.";
+
+    uses changed-by-parms;
+
+    leaf-list added-capability {
+      type inet:uri;
+      description
+        "List of capabilities that have just been added.";
+    }
+
+    leaf-list deleted-capability {
+      type inet:uri;
+      description
+        "List of capabilities that have just been deleted.";
+    }
+
+    leaf-list modified-capability {
+      type inet:uri;
+      description
+        "List of capabilities that have just been modified.
+         A capability is considered to be modified if the
+         base URI for the capability has not changed, but
+         one or more of the parameters encoded at the end of
+         the capability URI have changed.
+         The new modified value of the complete URI is returned.";
+    }
+  } // notification netconf-capability-change
+
+
+  notification netconf-session-start {
+    description
+      "Generated when a NETCONF server detects that a
+       NETCONF session has started.  A server MAY generate
+       this event for non-NETCONF management sessions.
+       Indicates the identity of the user that started
+       the session.";
+    uses common-session-parms;
+  } // notification netconf-session-start
+
+
+
+
+  notification netconf-session-end {
+    description
+      "Generated when a NETCONF server detects that a
+       NETCONF session has terminated.
+       A server MAY optionally generate this event for
+       non-NETCONF management sessions.  Indicates the
+       identity of the user that owned the session,
+       and why the session was terminated.";
+
+    uses common-session-parms;
+
+    leaf killed-by {
+      when "../termination-reason = 'killed'";
+      type nc:session-id-type;
+      description
+        "The ID of the session that directly caused this session
+         to be abnormally terminated.  If this session was abnormally
+         terminated by a non-NETCONF session unknown to the server,
+         then this leaf will not be present.";
+    }
+
+    leaf termination-reason {
+      type enumeration {
+        enum "closed" {
+          description
+            "The session was terminated by the client in normal
+             fashion, e.g., by the NETCONF <close-session>
+             protocol operation.";
+        }
+        enum "killed" {
+          description
+            "The session was terminated in abnormal
+             fashion, e.g., by the NETCONF <kill-session>
+             protocol operation.";
+        }
+        enum "dropped" {
+          description
+            "The session was terminated because the transport layer
+             connection was unexpectedly closed.";
+        }
+        enum "timeout" {
+          description
+            "The session was terminated because of inactivity,
+             e.g., waiting for the <hello> message or <rpc>
+             messages.";
+        }
+
+
+
+        enum "bad-hello" {
+          description
+            "The client's <hello> message was invalid.";
+        }
+        enum "other" {
+          description
+            "The session was terminated for some other reason.";
+        }
+      }
+      mandatory true;
+      description
+        "Reason the session was terminated.";
+    }
+  } // notification netconf-session-end
+
+
+  notification netconf-confirmed-commit {
+    description
+      "Generated when a NETCONF server detects that a
+       confirmed-commit event has occurred.  Indicates the event
+       and the current state of the confirmed-commit procedure
+       in progress.";
+    reference
+      "RFC 6241, Section 8.4";
+
+    uses common-session-parms {
+      when "confirm-event != 'timeout'";
+    }
+
+    leaf confirm-event {
+      type enumeration {
+        enum "start" {
+          description
+            "The confirmed-commit procedure has started.";
+        }
+        enum "cancel" {
+          description
+            "The confirmed-commit procedure has been canceled,
+             e.g., due to the session being terminated, or an
+             explicit <cancel-commit> operation.";
+        }
+        enum "timeout" {
+          description
+            "The confirmed-commit procedure has been canceled
+             due to the confirm-timeout interval expiring.
+             The common session parameters will not be present
+             in this sub-mode.";
+        }
+
+        enum "extend" {
+          description
+            "The confirmed-commit timeout has been extended,
+             e.g., by a new <confirmed-commit> operation.";
+        }
+        enum "complete" {
+          description
+            "The confirmed-commit procedure has been completed.";
+        }
+      }
+      mandatory true;
+      description
+        "Indicates the event that caused the notification.";
+    }
+
+    leaf timeout {
+      when
+        "../confirm-event = 'start' or ../confirm-event = 'extend'";
+      type uint32;
+      units "seconds";
+      description
+        "The configured timeout value if the event type
+         is 'start' or 'extend'.  This value represents
+         the approximate number of seconds from the event
+         time when the 'timeout' event might occur.";
+    }
+  } // notification netconf-confirmed-commit
+
+}
diff --git a/opendaylight/netconf/ietf-netconf-notifications/src/main/yang/nc-notifications@2008-07-14.yang b/opendaylight/netconf/ietf-netconf-notifications/src/main/yang/nc-notifications@2008-07-14.yang
new file mode 100644 (file)
index 0000000..fb9aac1
--- /dev/null
@@ -0,0 +1,95 @@
+module nc-notifications {
+
+    namespace "urn:ietf:params:xml:ns:netmod:notification";
+    prefix "manageEvent";
+
+    import ietf-yang-types{ prefix yang; }
+    import notifications { prefix ncEvent; }
+
+    organization
+      "IETF NETCONF WG";
+
+    contact
+      "netconf@ietf.org";
+
+    description
+      "Conversion of the 'manageEvent' XSD in the NETCONF
+       Notifications RFC.";
+
+    reference
+      "RFC 5277";
+
+    revision 2008-07-14 {
+      description "RFC 5277 version.";
+    }
+
+    container netconf {
+      description "Top-level element in the notification namespace";
+
+      config false;
+
+      container streams {
+        description
+          "The list of event streams supported by the system. When
+           a query is issued, the returned set of streams is
+           determined based on user privileges.";
+
+        list stream {
+          description
+            "Stream name, description and other information.";
+          key name;
+          min-elements 1;
+
+          leaf name {
+            description
+              "The name of the event stream. If this is the default
+               NETCONF stream, this must have the value 'NETCONF'.";
+            type ncEvent:streamNameType;
+          }
+
+          leaf description {
+            description
+              "A description of the event stream, including such
+               information as the type of events that are sent over
+               this stream.";
+            type string;
+            mandatory true;
+          }
+
+          leaf replaySupport {
+            description
+              "A description of the event stream, including such
+               information as the type of events that are sent over
+               this stream.";
+            type boolean;
+            mandatory true;
+          }
+
+          leaf replayLogCreationTime {
+            description
+              "The timestamp of the creation of the log used to support
+               the replay function on this stream. Note that this might
+               be earlier then the earliest available notification in
+               the log. This object is updated if the log resets for
+               some reason.  This object MUST be present if replay is
+               supported.";
+            type yang:date-and-time;   // xsd:dateTime is wrong!
+          }
+        }
+      }
+    }
+
+    notification replayComplete {
+      description
+        "This notification is sent to signal the end of a replay
+         portion of a subscription.";
+    }
+
+    notification notificationComplete {
+      description
+        "This notification is sent to signal the end of a notification
+         subscription. It is sent in the case that stopTime was
+         specified during the creation of the subscription..";
+    }
+
+}
diff --git a/opendaylight/netconf/ietf-netconf-notifications/src/main/yang/notifications@2008-07-14.yang b/opendaylight/netconf/ietf-netconf-notifications/src/main/yang/notifications@2008-07-14.yang
new file mode 100644 (file)
index 0000000..f107c2a
--- /dev/null
@@ -0,0 +1,83 @@
+module notifications {
+
+    namespace "urn:ietf:params:xml:ns:netconf:notification:1.0";
+    prefix "ncEvent";
+
+    import ietf-yang-types { prefix yang; }
+
+    organization
+      "IETF NETCONF WG";
+
+    contact
+      "netconf@ops.ietf.org";
+
+    description
+      "Conversion of the 'ncEvent' XSD in the
+       NETCONF Notifications RFC.";
+
+    reference
+      "RFC 5277.";
+
+    revision 2008-07-14 {
+      description "RFC 5277 version.";
+    }
+
+    typedef streamNameType {
+      description
+        "The name of an event stream.";
+      type string;
+    }
+
+    rpc create-subscription {
+      description
+        "The command to create a notification subscription. It
+         takes as argument the name of the notification stream
+         and filter. Both of those options limit the content of
+         the subscription. In addition, there are two time-related
+         parameters, startTime and stopTime, which can be used to
+         select the time interval of interest to the notification
+         replay feature.";
+
+      input {
+        leaf stream {
+          description
+            "An optional parameter that indicates which stream of events
+             is of interest. If not present, then events in the default
+             NETCONF stream will be sent.";
+          type streamNameType;
+          default "NETCONF";
+        }
+
+        anyxml filter {
+          description
+            "An optional parameter that indicates which subset of all
+             possible events is of interest. The format of this
+             parameter is the same as that of the filter parameter
+             in the NETCONF protocol operations. If not present,
+             all events not precluded by other parameters will
+             be sent.";
+        }
+
+        leaf startTime {
+          description
+            "A parameter used to trigger the replay feature and
+             indicates that the replay should start at the time
+             specified. If start time is not present, this is not a
+             replay subscription.";
+          type yang:date-and-time;
+        }
+
+        leaf stopTime {
+          // must ". >= ../startTime";
+          description
+            "An optional parameter used with the optional replay
+             feature to indicate the newest notifications of
+             interest. If stop time is not present, the notifications
+             will continue until the subscription is terminated.
+             Must be used with startTime.";
+          type yang:date-and-time;
+        }
+      }
+    }
+}
+
index eb3cac18df4700b17e01a641d5e60b63914130da..3487aa7be35975bf947b5eda372fd3e1ec964f85 100644 (file)
                 <version>${project.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>${project.groupId}</groupId>
+                <artifactId>ietf-netconf</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+
             <dependency>
                 <groupId>${project.groupId}</groupId>
                 <artifactId>ietf-netconf-monitoring</artifactId>
                 <version>${project.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>${project.groupId}</groupId>
+                <artifactId>ietf-netconf-notifications</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>${project.groupId}</groupId>
+                <artifactId>netconf-notifications-api</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>${project.groupId}</groupId>
+                <artifactId>netconf-notifications-impl</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+
             <dependency>
                 <groupId>${project.groupId}</groupId>
                 <artifactId>netconf-client</artifactId>
diff --git a/opendaylight/netconf/netconf-mapping-api/src/main/java/org/opendaylight/controller/netconf/mapping/api/SessionAwareNetconfOperation.java b/opendaylight/netconf/netconf-mapping-api/src/main/java/org/opendaylight/controller/netconf/mapping/api/SessionAwareNetconfOperation.java
new file mode 100644 (file)
index 0000000..88c77c6
--- /dev/null
@@ -0,0 +1,16 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.mapping.api;
+
+import org.opendaylight.controller.netconf.api.NetconfSession;
+
+public interface SessionAwareNetconfOperation extends NetconfOperation {
+
+    void setSession(NetconfSession session);
+}
diff --git a/opendaylight/netconf/netconf-notifications-api/pom.xml b/opendaylight/netconf/netconf-notifications-api/pom.xml
new file mode 100644 (file)
index 0000000..a1fbe15
--- /dev/null
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+  ~
+  ~ This program and the accompanying materials are made available under the
+  ~ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+  ~ and is available at http://www.eclipse.org/legal/epl-v10.html
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>netconf-subsystem</artifactId>
+        <groupId>org.opendaylight.controller</groupId>
+        <version>0.3.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <packaging>bundle</packaging>
+    <artifactId>netconf-notifications-api</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>netconf-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>ietf-netconf-notifications</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.opendaylight.yangtools</groupId>
+                <artifactId>yang-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <configuration>
+                    <instructions>
+                        <Export-Package>org.opendaylight.controller.netconf.notifications.*</Export-Package>
+                    </instructions>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git a/opendaylight/netconf/netconf-notifications-api/src/main/java/org/opendaylight/controller/netconf/notifications/BaseNetconfNotificationListener.java b/opendaylight/netconf/netconf-notifications-api/src/main/java/org/opendaylight/controller/netconf/notifications/BaseNetconfNotificationListener.java
new file mode 100644 (file)
index 0000000..899ab85
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications;
+
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
+
+
+/**
+ * Listener for base netconf notifications defined in https://tools.ietf.org/html/rfc6470.
+ * This listener uses generated classes from yang model defined in RFC6470.
+ * It alleviates the provisioning of base netconf notifications from the code.
+ */
+public interface BaseNetconfNotificationListener {
+
+    /**
+     * Callback used to notify about a change in used capabilities
+     */
+    void onCapabilityChanged(NetconfCapabilityChange capabilityChange);
+
+    // TODO add other base notifications
+
+}
diff --git a/opendaylight/netconf/netconf-notifications-api/src/main/java/org/opendaylight/controller/netconf/notifications/BaseNotificationPublisherRegistration.java b/opendaylight/netconf/netconf-notifications-api/src/main/java/org/opendaylight/controller/netconf/notifications/BaseNotificationPublisherRegistration.java
new file mode 100644 (file)
index 0000000..7755fc5
--- /dev/null
@@ -0,0 +1,16 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications;
+
+/**
+ * Registration for base notification publisher. This registration allows for publishing of base netconf notifications using generated classes
+ */
+public interface BaseNotificationPublisherRegistration extends NotificationRegistration, BaseNetconfNotificationListener {
+
+}
diff --git a/opendaylight/netconf/netconf-notifications-api/src/main/java/org/opendaylight/controller/netconf/notifications/NetconfNotification.java b/opendaylight/netconf/netconf-notifications-api/src/main/java/org/opendaylight/controller/netconf/notifications/NetconfNotification.java
new file mode 100644 (file)
index 0000000..efa42c0
--- /dev/null
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications;
+
+import com.google.common.base.Preconditions;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+/**
+ * Special kind of netconf message that contains a timestamp.
+ */
+public final class NetconfNotification extends NetconfMessage {
+
+    public static final String NOTIFICATION = "notification";
+    public static final String NOTIFICATION_NAMESPACE = "urn:ietf:params:netconf:capability:notification:1.0";
+    public static final String RFC3339_DATE_FORMAT_BLUEPRINT = "yyyy-MM-dd'T'HH:mm:ssXXX";
+    public static final String EVENT_TIME = "eventTime";
+
+    /**
+     * Create new notification and capture the timestamp in the constructor
+     */
+    public NetconfNotification(final Document notificationContent) {
+        this(notificationContent, new Date());
+    }
+
+    /**
+     * Create new notification with provided timestamp
+     */
+    public NetconfNotification(final Document notificationContent, final Date eventTime) {
+        super(wrapNotification(notificationContent, eventTime));
+    }
+
+    private static Document wrapNotification(final Document notificationContent, final Date eventTime) {
+        Preconditions.checkNotNull(notificationContent);
+        Preconditions.checkNotNull(eventTime);
+
+        final Element baseNotification = notificationContent.getDocumentElement();
+        final Element entireNotification = notificationContent.createElementNS(NOTIFICATION_NAMESPACE, NOTIFICATION);
+        entireNotification.appendChild(baseNotification);
+
+        final Element eventTimeElement = notificationContent.createElementNS(NOTIFICATION_NAMESPACE, EVENT_TIME);
+        eventTimeElement.setTextContent(getSerializedEventTime(eventTime));
+        entireNotification.appendChild(eventTimeElement);
+
+        notificationContent.appendChild(entireNotification);
+        return notificationContent;
+    }
+
+    private static String getSerializedEventTime(final Date eventTime) {
+        // SimpleDateFormat is not threadsafe, cannot be in a constant
+        return new SimpleDateFormat(RFC3339_DATE_FORMAT_BLUEPRINT).format(eventTime);
+    }
+}
diff --git a/opendaylight/netconf/netconf-notifications-api/src/main/java/org/opendaylight/controller/netconf/notifications/NetconfNotificationCollector.java b/opendaylight/netconf/netconf-notifications-api/src/main/java/org/opendaylight/controller/netconf/notifications/NetconfNotificationCollector.java
new file mode 100644 (file)
index 0000000..2663a5d
--- /dev/null
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications;
+
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
+
+/**
+ * Collector of all notifications. Base or generic
+ */
+public interface NetconfNotificationCollector  {
+
+    /**
+     * Add notification publisher for a particular stream
+     *
+     * Implementations should allow for multiple publishers of a single stream
+     * and its up to implementations to decide how to merge metadata (e.g. description)
+     * for the same stream when providing information about available stream
+     *
+     */
+    NotificationPublisherRegistration registerNotificationPublisher(Stream stream);
+
+    /**
+     * Register base notification publisher
+     */
+    BaseNotificationPublisherRegistration registerBaseNotificationPublisher();
+
+    /**
+     * Users of the registry have an option to get notification each time new notification stream gets registered
+     * This allows for a push model in addition to pull model for retrieving information about available streams.
+     *
+     * The listener should receive callbacks for each stream available prior to the registration when its registered
+     */
+    NotificationRegistration registerStreamListener(NetconfNotificationStreamListener listener);
+
+    /**
+     * Simple listener that receives notifications about changes in stream availability
+     */
+    public interface NetconfNotificationStreamListener {
+
+        /**
+         * Stream becomes available in the collector (first publisher is registered)
+         */
+        void onStreamRegistered(Stream stream);
+
+        /**
+         * Stream is not available anymore in the collector (last publisher is unregistered)
+         */
+        void onStreamUnregistered(StreamNameType stream);
+    }
+
+}
diff --git a/opendaylight/netconf/netconf-notifications-api/src/main/java/org/opendaylight/controller/netconf/notifications/NetconfNotificationListener.java b/opendaylight/netconf/netconf-notifications-api/src/main/java/org/opendaylight/controller/netconf/notifications/NetconfNotificationListener.java
new file mode 100644 (file)
index 0000000..e1da05c
--- /dev/null
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications;
+
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
+
+/**
+ * Generic listener for netconf notifications
+ */
+public interface NetconfNotificationListener {
+
+    /**
+     * Callback used to notify the listener about any new notification
+     */
+    void onNotification(StreamNameType stream, NetconfNotification notification);
+
+}
diff --git a/opendaylight/netconf/netconf-notifications-api/src/main/java/org/opendaylight/controller/netconf/notifications/NetconfNotificationRegistry.java b/opendaylight/netconf/netconf-notifications-api/src/main/java/org/opendaylight/controller/netconf/notifications/NetconfNotificationRegistry.java
new file mode 100644 (file)
index 0000000..db2443e
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications;
+
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
+
+/**
+ *
+ */
+public interface NetconfNotificationRegistry {
+
+    /**
+     * Add listener for a certain notification type
+     */
+    NotificationListenerRegistration registerNotificationListener(StreamNameType stream, NetconfNotificationListener listener);
+
+    /**
+     * Check stream availability
+     */
+    boolean isStreamAvailable(StreamNameType streamNameType);
+
+    /**
+     * Get all the streams available
+     */
+    Streams getNotificationPublishers();
+
+}
diff --git a/opendaylight/netconf/netconf-notifications-api/src/main/java/org/opendaylight/controller/netconf/notifications/NotificationListenerRegistration.java b/opendaylight/netconf/netconf-notifications-api/src/main/java/org/opendaylight/controller/netconf/notifications/NotificationListenerRegistration.java
new file mode 100644 (file)
index 0000000..aa81612
--- /dev/null
@@ -0,0 +1,16 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications;
+
+/**
+ * Manages the registration of a single listener
+ */
+public interface NotificationListenerRegistration extends NotificationRegistration {
+
+}
diff --git a/opendaylight/netconf/netconf-notifications-api/src/main/java/org/opendaylight/controller/netconf/notifications/NotificationPublisherRegistration.java b/opendaylight/netconf/netconf-notifications-api/src/main/java/org/opendaylight/controller/netconf/notifications/NotificationPublisherRegistration.java
new file mode 100644 (file)
index 0000000..de105fc
--- /dev/null
@@ -0,0 +1,16 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications;
+
+/**
+ * Registration for notification publisher. This registration allows for publishing any netconf notifications
+ */
+public interface NotificationPublisherRegistration extends NetconfNotificationListener, NotificationRegistration {
+
+}
diff --git a/opendaylight/netconf/netconf-notifications-api/src/main/java/org/opendaylight/controller/netconf/notifications/NotificationRegistration.java b/opendaylight/netconf/netconf-notifications-api/src/main/java/org/opendaylight/controller/netconf/notifications/NotificationRegistration.java
new file mode 100644 (file)
index 0000000..a7a86a4
--- /dev/null
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications;
+
+/**
+ * Generic registration, used as a base for other registration types
+ */
+public interface NotificationRegistration extends AutoCloseable {
+
+    // Overriden close does not throw any kind of checked exception
+
+    /**
+     * Close the registration.
+     */
+    @Override
+    void close();
+}
diff --git a/opendaylight/netconf/netconf-notifications-impl/pom.xml b/opendaylight/netconf/netconf-notifications-impl/pom.xml
new file mode 100644 (file)
index 0000000..510d9f0
--- /dev/null
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+  ~
+  ~ This program and the accompanying materials are made available under the
+  ~ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+  ~ and is available at http://www.eclipse.org/legal/epl-v10.html
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>netconf-subsystem</artifactId>
+        <groupId>org.opendaylight.controller</groupId>
+        <version>0.3.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <packaging>bundle</packaging>
+    <artifactId>netconf-notifications-impl</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>netconf-notifications-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>netconf-util</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.yangtools</groupId>
+            <artifactId>binding-generator-impl</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.yangtools</groupId>
+            <artifactId>binding-data-codec</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>xmlunit</groupId>
+            <artifactId>xmlunit</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.yangtools</groupId>
+            <artifactId>mockito-configuration</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <configuration>
+                    <instructions>
+                        <Bundle-Activator>org.opendaylight.controller.netconf.notifications.impl.osgi.Activator</Bundle-Activator>
+                    </instructions>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.opendaylight.yangtools</groupId>
+                <artifactId>yang-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git a/opendaylight/netconf/netconf-notifications-impl/src/main/java/org/opendaylight/controller/netconf/notifications/impl/NetconfNotificationManager.java b/opendaylight/netconf/netconf-notifications-impl/src/main/java/org/opendaylight/controller/netconf/notifications/impl/NetconfNotificationManager.java
new file mode 100644 (file)
index 0000000..d2dbcaf
--- /dev/null
@@ -0,0 +1,283 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications.impl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+import com.google.common.collect.Sets;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+import org.opendaylight.controller.netconf.notifications.BaseNotificationPublisherRegistration;
+import org.opendaylight.controller.netconf.notifications.NetconfNotification;
+import org.opendaylight.controller.netconf.notifications.NetconfNotificationCollector;
+import org.opendaylight.controller.netconf.notifications.NetconfNotificationListener;
+import org.opendaylight.controller.netconf.notifications.NetconfNotificationRegistry;
+import org.opendaylight.controller.netconf.notifications.NotificationListenerRegistration;
+import org.opendaylight.controller.netconf.notifications.NotificationPublisherRegistration;
+import org.opendaylight.controller.netconf.notifications.NotificationRegistration;
+import org.opendaylight.controller.netconf.notifications.impl.ops.NotificationsTransformUtil;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.StreamsBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamKey;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@ThreadSafe
+public class NetconfNotificationManager implements NetconfNotificationCollector, NetconfNotificationRegistry, NetconfNotificationListener, AutoCloseable {
+
+    public static final StreamNameType BASE_STREAM_NAME = new StreamNameType("NETCONF");
+    public static final Stream BASE_NETCONF_STREAM;
+
+    static {
+        BASE_NETCONF_STREAM = new StreamBuilder()
+                .setName(BASE_STREAM_NAME)
+                .setKey(new StreamKey(BASE_STREAM_NAME))
+                .setReplaySupport(false)
+                .setDescription("Default Event Stream")
+                .build();
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(NetconfNotificationManager.class);
+
+    // TODO excessive synchronization provides thread safety but is most likely not optimal (combination of concurrent collections might improve performance)
+    // And also calling callbacks from a synchronized block is dangerous since the listeners/publishers can block the whole notification processing
+
+    @GuardedBy("this")
+    private final Multimap<StreamNameType, GenericNotificationListenerReg> notificationListeners = HashMultimap.create();
+
+    @GuardedBy("this")
+    private final Set<NetconfNotificationStreamListener> streamListeners = Sets.newHashSet();
+
+    @GuardedBy("this")
+    private final Map<StreamNameType, Stream> streamMetadata = Maps.newHashMap();
+
+    @GuardedBy("this")
+    private final Multiset<StreamNameType> availableStreams = HashMultiset.create();
+
+    @GuardedBy("this")
+    private final Set<GenericNotificationPublisherReg> notificationPublishers = Sets.newHashSet();
+
+    @Override
+    public synchronized void onNotification(final StreamNameType stream, final NetconfNotification notification) {
+        LOG.debug("Notification of type {} detected", stream);
+        if(LOG.isTraceEnabled()) {
+            LOG.debug("Notification of type {} detected: {}", stream, notification);
+        }
+
+        for (final GenericNotificationListenerReg listenerReg : notificationListeners.get(BASE_STREAM_NAME)) {
+            listenerReg.getListener().onNotification(BASE_STREAM_NAME, notification);
+        }
+    }
+
+    @Override
+    public synchronized NotificationListenerRegistration registerNotificationListener(final StreamNameType stream, final NetconfNotificationListener listener) {
+        Preconditions.checkNotNull(stream);
+        Preconditions.checkNotNull(listener);
+
+        LOG.trace("Notification listener registered for stream: {}", stream);
+
+        final GenericNotificationListenerReg genericNotificationListenerReg = new GenericNotificationListenerReg(listener) {
+            @Override
+            public void close() {
+                synchronized (NetconfNotificationManager.this) {
+                    LOG.trace("Notification listener unregistered for stream: {}", stream);
+                    super.close();
+                }
+            }
+        };
+
+        notificationListeners.put(BASE_STREAM_NAME, genericNotificationListenerReg);
+        return genericNotificationListenerReg;
+    }
+
+    @Override
+    public synchronized Streams getNotificationPublishers() {
+        return new StreamsBuilder().setStream(Lists.newArrayList(streamMetadata.values())).build();
+    }
+
+    @Override
+    public synchronized boolean isStreamAvailable(final StreamNameType streamNameType) {
+        return availableStreams.contains(streamNameType);
+    }
+
+    @Override
+    public synchronized NotificationRegistration registerStreamListener(final NetconfNotificationStreamListener listener) {
+        streamListeners.add(listener);
+
+        // Notify about all already available
+        for (final Stream availableStream : streamMetadata.values()) {
+            listener.onStreamRegistered(availableStream);
+        }
+
+        return new NotificationRegistration() {
+            @Override
+            public void close() {
+                synchronized(NetconfNotificationManager.this) {
+                    streamListeners.remove(listener);
+                }
+            }
+        };
+    }
+
+    @Override
+    public synchronized void close() {
+        // Unregister all listeners
+        for (final GenericNotificationListenerReg genericNotificationListenerReg : notificationListeners.values()) {
+            genericNotificationListenerReg.close();
+        }
+        notificationListeners.clear();
+
+        // Unregister all publishers
+        for (final GenericNotificationPublisherReg notificationPublisher : notificationPublishers) {
+            notificationPublisher.close();
+        }
+        notificationPublishers.clear();
+
+        // Clear stream Listeners
+        streamListeners.clear();
+    }
+
+    @Override
+    public synchronized NotificationPublisherRegistration registerNotificationPublisher(final Stream stream) {
+        Preconditions.checkNotNull(stream);
+        final StreamNameType streamName = stream.getName();
+
+        LOG.debug("Notification publisher registered for stream: {}", streamName);
+        if(LOG.isTraceEnabled()) {
+            LOG.trace("Notification publisher registered for stream: {}", stream);
+        }
+
+        if(streamMetadata.containsKey(streamName)) {
+            LOG.warn("Notification stream {} already registered as: {}. Will be reused", streamName, streamMetadata.get(streamName));
+        } else {
+            streamMetadata.put(streamName, stream);
+        }
+
+        availableStreams.add(streamName);
+
+        final GenericNotificationPublisherReg genericNotificationPublisherReg = new GenericNotificationPublisherReg(this, streamName) {
+            @Override
+            public void close() {
+                synchronized (NetconfNotificationManager.this) {
+                    super.close();
+                }
+            }
+        };
+
+        notificationPublishers.add(genericNotificationPublisherReg);
+
+        notifyStreamAdded(stream);
+        return genericNotificationPublisherReg;
+    }
+
+    private void unregisterNotificationPublisher(final StreamNameType streamName, final GenericNotificationPublisherReg genericNotificationPublisherReg) {
+        availableStreams.remove(streamName);
+        notificationPublishers.remove(genericNotificationPublisherReg);
+
+        LOG.debug("Notification publisher unregistered for stream: {}", streamName);
+
+        // Notify stream listeners if all publishers are gone and also clear metadata for stream
+        if (!isStreamAvailable(streamName)) {
+            LOG.debug("Notification stream: {} became unavailable", streamName);
+            streamMetadata.remove(streamName);
+            notifyStreamRemoved(streamName);
+        }
+    }
+
+    private synchronized void notifyStreamAdded(final Stream stream) {
+        for (final NetconfNotificationStreamListener streamListener : streamListeners) {
+            streamListener.onStreamRegistered(stream);
+        }
+    }
+    private synchronized void notifyStreamRemoved(final StreamNameType stream) {
+        for (final NetconfNotificationStreamListener streamListener : streamListeners) {
+            streamListener.onStreamUnregistered(stream);
+        }
+    }
+
+    @Override
+    public BaseNotificationPublisherRegistration registerBaseNotificationPublisher() {
+        final NotificationPublisherRegistration notificationPublisherRegistration = registerNotificationPublisher(BASE_NETCONF_STREAM);
+        return new BaseNotificationPublisherReg(notificationPublisherRegistration);
+    }
+
+    private static class GenericNotificationPublisherReg implements NotificationPublisherRegistration {
+        private NetconfNotificationManager baseListener;
+        private final StreamNameType registeredStream;
+
+        public GenericNotificationPublisherReg(final NetconfNotificationManager baseListener, final StreamNameType registeredStream) {
+            this.baseListener = baseListener;
+            this.registeredStream = registeredStream;
+        }
+
+        @Override
+        public void close() {
+            baseListener.unregisterNotificationPublisher(registeredStream, this);
+            baseListener = null;
+        }
+
+        @Override
+        public void onNotification(final StreamNameType stream, final NetconfNotification notification) {
+            Preconditions.checkState(baseListener != null, "Already closed");
+            Preconditions.checkArgument(stream.equals(registeredStream));
+            baseListener.onNotification(stream, notification);
+        }
+    }
+
+    private static class BaseNotificationPublisherReg implements BaseNotificationPublisherRegistration {
+
+        private final NotificationPublisherRegistration baseRegistration;
+
+        public BaseNotificationPublisherReg(final NotificationPublisherRegistration baseRegistration) {
+            this.baseRegistration = baseRegistration;
+        }
+
+        @Override
+        public void close() {
+            baseRegistration.close();
+        }
+
+        @Override
+        public void onCapabilityChanged(final NetconfCapabilityChange capabilityChange) {
+            baseRegistration.onNotification(BASE_STREAM_NAME, serializeNotification(capabilityChange));
+        }
+
+        private static NetconfNotification serializeNotification(final NetconfCapabilityChange capabilityChange) {
+            return NotificationsTransformUtil.transform(capabilityChange);
+        }
+    }
+
+    private class GenericNotificationListenerReg implements NotificationListenerRegistration {
+        private final NetconfNotificationListener listener;
+
+        public GenericNotificationListenerReg(final NetconfNotificationListener listener) {
+            this.listener = listener;
+        }
+
+        public NetconfNotificationListener getListener() {
+            return listener;
+        }
+
+        @Override
+        public void close() {
+            notificationListeners.remove(BASE_STREAM_NAME, this);
+        }
+    }
+}
diff --git a/opendaylight/netconf/netconf-notifications-impl/src/main/java/org/opendaylight/controller/netconf/notifications/impl/ops/CreateSubscription.java b/opendaylight/netconf/netconf-notifications-impl/src/main/java/org/opendaylight/controller/netconf/notifications/impl/ops/CreateSubscription.java
new file mode 100644 (file)
index 0000000..e8b7413
--- /dev/null
@@ -0,0 +1,128 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications.impl.ops;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
+import org.opendaylight.controller.netconf.api.NetconfSession;
+import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
+import org.opendaylight.controller.netconf.mapping.api.SessionAwareNetconfOperation;
+import org.opendaylight.controller.netconf.notifications.NetconfNotification;
+import org.opendaylight.controller.netconf.notifications.NetconfNotificationListener;
+import org.opendaylight.controller.netconf.notifications.NetconfNotificationRegistry;
+import org.opendaylight.controller.netconf.notifications.NotificationListenerRegistration;
+import org.opendaylight.controller.netconf.notifications.impl.NetconfNotificationManager;
+import org.opendaylight.controller.netconf.util.mapping.AbstractLastNetconfOperation;
+import org.opendaylight.controller.netconf.util.xml.XmlElement;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+/**
+ * Create subscription listens for create subscription requests and registers notification listeners into notification registry.
+ * Received notifications are sent to the client right away
+ */
+public class CreateSubscription extends AbstractLastNetconfOperation implements SessionAwareNetconfOperation, AutoCloseable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CreateSubscription.class);
+
+    static final String CREATE_SUBSCRIPTION = "create-subscription";
+
+    private final NetconfNotificationRegistry notifications;
+    private final List<NotificationListenerRegistration> subscriptions = Lists.newArrayList();
+    private NetconfSession netconfSession;
+
+    public CreateSubscription(final String netconfSessionIdForReporting, final NetconfNotificationRegistry notifications) {
+        super(netconfSessionIdForReporting);
+        this.notifications = notifications;
+    }
+
+    @Override
+    protected Element handleWithNoSubsequentOperations(final Document document, final XmlElement operationElement) throws NetconfDocumentedException {
+        operationElement.checkName(CREATE_SUBSCRIPTION);
+        operationElement.checkNamespace(CreateSubscriptionInput.QNAME.getNamespace().toString());
+        // FIXME reimplement using CODEC_REGISTRY and parse everything into generated class instance
+        // Waiting ofr https://git.opendaylight.org/gerrit/#/c/13763/
+
+        // FIXME filter could be supported same way as netconf server filters get and get-config results
+        final Optional<XmlElement> filter = operationElement.getOnlyChildElementWithSameNamespaceOptionally("filter");
+        Preconditions.checkArgument(filter.isPresent() == false, "Filter element not yet supported");
+
+        // Replay not supported
+        final Optional<XmlElement> startTime = operationElement.getOnlyChildElementWithSameNamespaceOptionally("startTime");
+        Preconditions.checkArgument(startTime.isPresent() == false, "StartTime element not yet supported");
+
+        // Stop time not supported
+        final Optional<XmlElement> stopTime = operationElement.getOnlyChildElementWithSameNamespaceOptionally("stopTime");
+        Preconditions.checkArgument(stopTime.isPresent() == false, "StopTime element not yet supported");
+
+        final StreamNameType streamNameType = parseStreamIfPresent(operationElement);
+
+        Preconditions.checkNotNull(netconfSession);
+        // Premature streams are allowed (meaning listener can register even if no provider is available yet)
+        if(notifications.isStreamAvailable(streamNameType) == false) {
+            LOG.warn("Registering premature stream {}. No publisher available yet for session {}", streamNameType, getNetconfSessionIdForReporting());
+        }
+
+        final NotificationListenerRegistration notificationListenerRegistration =
+                notifications.registerNotificationListener(streamNameType, new NotificationSubscription(netconfSession));
+        subscriptions.add(notificationListenerRegistration);
+
+        return XmlUtil.createElement(document, XmlNetconfConstants.OK, Optional.<String>absent());
+    }
+
+    private StreamNameType parseStreamIfPresent(final XmlElement operationElement) throws NetconfDocumentedException {
+        final Optional<XmlElement> stream = operationElement.getOnlyChildElementWithSameNamespaceOptionally("stream");
+        return stream.isPresent() ? new StreamNameType(stream.get().getTextContent()) : NetconfNotificationManager.BASE_STREAM_NAME;
+    }
+
+    @Override
+    protected String getOperationName() {
+        return CREATE_SUBSCRIPTION;
+    }
+
+    @Override
+    protected String getOperationNamespace() {
+        return CreateSubscriptionInput.QNAME.getNamespace().toString();
+    }
+
+    @Override
+    public void setSession(final NetconfSession session) {
+        this.netconfSession = session;
+    }
+
+    @Override
+    public void close() {
+        netconfSession = null;
+        // Unregister from notification streams
+        for (final NotificationListenerRegistration subscription : subscriptions) {
+            subscription.close();
+        }
+    }
+
+    private static class NotificationSubscription implements NetconfNotificationListener {
+        private final NetconfSession currentSession;
+
+        public NotificationSubscription(final NetconfSession currentSession) {
+            this.currentSession = currentSession;
+        }
+
+        @Override
+        public void onNotification(final StreamNameType stream, final NetconfNotification notification) {
+            currentSession.sendMessage(notification);
+        }
+    }
+}
diff --git a/opendaylight/netconf/netconf-notifications-impl/src/main/java/org/opendaylight/controller/netconf/notifications/impl/ops/Get.java b/opendaylight/netconf/netconf-notifications-impl/src/main/java/org/opendaylight/controller/netconf/notifications/impl/ops/Get.java
new file mode 100644 (file)
index 0000000..85f2936
--- /dev/null
@@ -0,0 +1,102 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications.impl.ops;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.transform.dom.DOMResult;
+import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
+import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
+import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
+import org.opendaylight.controller.netconf.notifications.NetconfNotificationRegistry;
+import org.opendaylight.controller.netconf.util.mapping.AbstractNetconfOperation;
+import org.opendaylight.controller.netconf.util.xml.XmlElement;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.NetconfBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+/**
+ * Serialize the subtree for netconf notifications into the response of get rpc.
+ * This operation just adds its subtree into the common response of get rpc.
+ */
+public class Get extends AbstractNetconfOperation implements AutoCloseable {
+
+    private static final String GET = "get";
+    private static final InstanceIdentifier<Netconf> NETCONF_SUBTREE_INSTANCE_IDENTIFIER = InstanceIdentifier.builder(Netconf.class).build();
+
+    private final NetconfNotificationRegistry notificationRegistry;
+
+    public Get(final String netconfSessionIdForReporting, final NetconfNotificationRegistry notificationRegistry) {
+        super(netconfSessionIdForReporting);
+        Preconditions.checkNotNull(notificationRegistry);
+        this.notificationRegistry = notificationRegistry;
+    }
+
+    @Override
+    protected String getOperationName() {
+        return GET;
+    }
+
+    @Override
+    public Document handle(final Document requestMessage, final NetconfOperationChainedExecution subsequentOperation) throws NetconfDocumentedException {
+        final Document partialResponse = subsequentOperation.execute(requestMessage);
+        final Streams availableStreams = notificationRegistry.getNotificationPublishers();
+        if(availableStreams.getStream().isEmpty() == false) {
+            serializeStreamsSubtree(partialResponse, availableStreams);
+        }
+        return partialResponse;
+    }
+
+    static void serializeStreamsSubtree(final Document partialResponse, final Streams availableStreams) throws NetconfDocumentedException {
+        final Netconf netconfSubtree = new NetconfBuilder().setStreams(availableStreams).build();
+        final NormalizedNode<?, ?> normalized = toNormalized(netconfSubtree);
+
+        final DOMResult result = new DOMResult(getPlaceholder(partialResponse));
+
+        try {
+            NotificationsTransformUtil.writeNormalizedNode(normalized, result, SchemaPath.ROOT);
+        } catch (final XMLStreamException | IOException e) {
+            throw new IllegalStateException("Unable to serialize " + netconfSubtree, e);
+        }
+    }
+
+    private static Element getPlaceholder(final Document innerResult)
+            throws NetconfDocumentedException {
+        final XmlElement rootElement = XmlElement.fromDomElementWithExpected(
+                innerResult.getDocumentElement(), XmlNetconfConstants.RPC_REPLY_KEY, XmlNetconfConstants.RFC4741_TARGET_NAMESPACE);
+        return rootElement.getOnlyChildElement(XmlNetconfConstants.DATA_KEY).getDomElement();
+    }
+
+    private static NormalizedNode<?, ?> toNormalized(final Netconf netconfSubtree) {
+        return NotificationsTransformUtil.CODEC_REGISTRY.toNormalizedNode(NETCONF_SUBTREE_INSTANCE_IDENTIFIER, netconfSubtree).getValue();
+    }
+
+    @Override
+    protected Element handle(final Document document, final XmlElement message, final NetconfOperationChainedExecution subsequentOperation)
+            throws NetconfDocumentedException {
+        throw new UnsupportedOperationException("Never gets called");
+    }
+
+    @Override
+    protected HandlingPriority getHandlingPriority() {
+        return HandlingPriority.HANDLE_WITH_DEFAULT_PRIORITY.increasePriority(2);
+    }
+
+    @Override
+    public void close() throws Exception {
+
+    }
+}
diff --git a/opendaylight/netconf/netconf-notifications-impl/src/main/java/org/opendaylight/controller/netconf/notifications/impl/ops/NotificationsTransformUtil.java b/opendaylight/netconf/netconf-notifications-impl/src/main/java/org/opendaylight/controller/netconf/notifications/impl/ops/NotificationsTransformUtil.java
new file mode 100644 (file)
index 0000000..080176d
--- /dev/null
@@ -0,0 +1,141 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications.impl.ops;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Date;
+import javassist.ClassPool;
+import javax.xml.stream.XMLOutputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamWriter;
+import javax.xml.transform.dom.DOMResult;
+import org.opendaylight.controller.netconf.notifications.NetconfNotification;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.$YangModuleInfoImpl;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
+import org.opendaylight.yangtools.binding.data.codec.gen.impl.StreamWriterGenerator;
+import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
+import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext;
+import org.opendaylight.yangtools.sal.binding.generator.util.BindingRuntimeContext;
+import org.opendaylight.yangtools.sal.binding.generator.util.JavassistUtils;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
+import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
+import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+
+public final class NotificationsTransformUtil {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NotificationsTransformUtil.class);
+
+    private NotificationsTransformUtil() {}
+
+    static final SchemaContext NOTIFICATIONS_SCHEMA_CTX;
+    static final BindingNormalizedNodeCodecRegistry CODEC_REGISTRY;
+    static final XMLOutputFactory XML_FACTORY;
+    static final RpcDefinition CREATE_SUBSCRIPTION_RPC;
+
+    static final SchemaPath CAPABILITY_CHANGE_SCHEMA_PATH = SchemaPath.create(true, NetconfCapabilityChange.QNAME);
+
+    static {
+        XML_FACTORY = XMLOutputFactory.newFactory();
+        XML_FACTORY.setProperty(XMLOutputFactory.IS_REPAIRING_NAMESPACES, true);
+
+        final ModuleInfoBackedContext moduleInfoBackedContext = ModuleInfoBackedContext.create();
+        moduleInfoBackedContext.addModuleInfos(Collections.singletonList($YangModuleInfoImpl.getInstance()));
+        moduleInfoBackedContext.addModuleInfos(Collections.singletonList(org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.$YangModuleInfoImpl.getInstance()));
+        final Optional<SchemaContext> schemaContextOptional = moduleInfoBackedContext.tryToCreateSchemaContext();
+        Preconditions.checkState(schemaContextOptional.isPresent());
+        NOTIFICATIONS_SCHEMA_CTX = schemaContextOptional.get();
+
+        CREATE_SUBSCRIPTION_RPC = Preconditions.checkNotNull(findCreateSubscriptionRpc());
+
+        Preconditions.checkNotNull(CREATE_SUBSCRIPTION_RPC);
+
+        final JavassistUtils javassist = JavassistUtils.forClassPool(ClassPool.getDefault());
+        CODEC_REGISTRY = new BindingNormalizedNodeCodecRegistry(StreamWriterGenerator.create(javassist));
+        CODEC_REGISTRY.onBindingRuntimeContextUpdated(BindingRuntimeContext.create(moduleInfoBackedContext, NOTIFICATIONS_SCHEMA_CTX));
+    }
+
+    private static RpcDefinition findCreateSubscriptionRpc() {
+        return Iterables.getFirst(Collections2.filter(NOTIFICATIONS_SCHEMA_CTX.getOperations(), new Predicate<RpcDefinition>() {
+            @Override
+            public boolean apply(final RpcDefinition input) {
+                return input.getQName().getLocalName().equals(CreateSubscription.CREATE_SUBSCRIPTION);
+            }
+        }), null);
+    }
+
+    /**
+     * Transform base notification for capabilities into NetconfNotification
+     */
+    public static NetconfNotification transform(final NetconfCapabilityChange capabilityChange) {
+        return transform(capabilityChange, Optional.<Date>absent());
+    }
+
+    public static NetconfNotification transform(final NetconfCapabilityChange capabilityChange, final Date eventTime) {
+        return transform(capabilityChange, Optional.fromNullable(eventTime));
+    }
+
+    private static NetconfNotification transform(final NetconfCapabilityChange capabilityChange, final Optional<Date> eventTime) {
+        final ContainerNode containerNode = CODEC_REGISTRY.toNormalizedNodeNotification(capabilityChange);
+        final DOMResult result = new DOMResult(XmlUtil.newDocument());
+        try {
+            writeNormalizedNode(containerNode, result, CAPABILITY_CHANGE_SCHEMA_PATH);
+        } catch (final XMLStreamException| IOException e) {
+            throw new IllegalStateException("Unable to serialize " + capabilityChange, e);
+        }
+        final Document node = (Document) result.getNode();
+        return eventTime.isPresent() ?
+                new NetconfNotification(node, eventTime.get()):
+                new NetconfNotification(node);
+    }
+
+    static void writeNormalizedNode(final NormalizedNode<?, ?> normalized, final DOMResult result, final SchemaPath schemaPath) throws IOException, XMLStreamException {
+        NormalizedNodeWriter normalizedNodeWriter = null;
+        NormalizedNodeStreamWriter normalizedNodeStreamWriter = null;
+        XMLStreamWriter writer = null;
+        try {
+            writer = XML_FACTORY.createXMLStreamWriter(result);
+            normalizedNodeStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(writer, NOTIFICATIONS_SCHEMA_CTX, schemaPath);
+            normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(normalizedNodeStreamWriter);
+
+            normalizedNodeWriter.write(normalized);
+
+            normalizedNodeWriter.flush();
+        } finally {
+            try {
+                if(normalizedNodeWriter != null) {
+                    normalizedNodeWriter.close();
+                }
+                if(normalizedNodeStreamWriter != null) {
+                    normalizedNodeStreamWriter.close();
+                }
+                if(writer != null) {
+                    writer.close();
+                }
+            } catch (final Exception e) {
+                LOG.warn("Unable to close resource properly", e);
+            }
+        }
+    }
+
+}
diff --git a/opendaylight/netconf/netconf-notifications-impl/src/main/java/org/opendaylight/controller/netconf/notifications/impl/osgi/Activator.java b/opendaylight/netconf/netconf-notifications-impl/src/main/java/org/opendaylight/controller/netconf/notifications/impl/osgi/Activator.java
new file mode 100644 (file)
index 0000000..ef950f8
--- /dev/null
@@ -0,0 +1,119 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications.impl.osgi;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Hashtable;
+import java.util.Set;
+import org.opendaylight.controller.netconf.mapping.api.Capability;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
+import org.opendaylight.controller.netconf.notifications.NetconfNotification;
+import org.opendaylight.controller.netconf.notifications.NetconfNotificationCollector;
+import org.opendaylight.controller.netconf.notifications.impl.NetconfNotificationManager;
+import org.opendaylight.controller.netconf.notifications.impl.ops.CreateSubscription;
+import org.opendaylight.controller.netconf.notifications.impl.ops.Get;
+import org.osgi.framework.BundleActivator;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+
+public class Activator implements BundleActivator {
+
+    private ServiceRegistration<NetconfNotificationCollector> netconfNotificationCollectorServiceRegistration;
+    private ServiceRegistration<NetconfOperationServiceFactory> operationaServiceRegistration;
+    private NetconfNotificationManager netconfNotificationManager;
+
+    @Override
+    public void start(final BundleContext context) throws Exception {
+        netconfNotificationManager = new NetconfNotificationManager();
+        netconfNotificationCollectorServiceRegistration = context.registerService(NetconfNotificationCollector.class, netconfNotificationManager, new Hashtable<String, Object>());
+
+        final NetconfOperationServiceFactory netconfOperationServiceFactory = new NetconfOperationServiceFactory() {
+
+            @Override
+            public NetconfOperationService createService(final String netconfSessionIdForReporting) {
+                return new NetconfOperationService() {
+
+                    private final CreateSubscription createSubscription = new CreateSubscription(netconfSessionIdForReporting, netconfNotificationManager);
+
+                    @Override
+                    public Set<Capability> getCapabilities() {
+                        return Collections.<Capability>singleton(new NotificationsCapability());
+                    }
+
+                    @Override
+                    public Set<NetconfOperation> getNetconfOperations() {
+                        return Sets.<NetconfOperation>newHashSet(
+                                new Get(netconfSessionIdForReporting, netconfNotificationManager),
+                                createSubscription);
+                    }
+
+                    @Override
+                    public void close() {
+                        createSubscription.close();
+                    }
+                };
+            }
+        };
+
+        operationaServiceRegistration = context.registerService(NetconfOperationServiceFactory.class, netconfOperationServiceFactory, new Hashtable<String, Object>());
+
+    }
+
+    @Override
+    public void stop(final BundleContext context) throws Exception {
+        if(netconfNotificationCollectorServiceRegistration != null) {
+            netconfNotificationCollectorServiceRegistration.unregister();
+            netconfNotificationCollectorServiceRegistration = null;
+        }
+        if (netconfNotificationManager != null) {
+            netconfNotificationManager.close();
+        }
+        if (operationaServiceRegistration != null) {
+            operationaServiceRegistration.unregister();
+            operationaServiceRegistration = null;
+        }
+    }
+
+    private class NotificationsCapability implements Capability {
+        @Override
+        public String getCapabilityUri() {
+            return NetconfNotification.NOTIFICATION_NAMESPACE;
+        }
+
+        @Override
+        public Optional<String> getModuleNamespace() {
+            return Optional.absent();
+        }
+
+        @Override
+        public Optional<String> getModuleName() {
+            return Optional.absent();
+        }
+
+        @Override
+        public Optional<String> getRevision() {
+            return Optional.absent();
+        }
+
+        @Override
+        public Optional<String> getCapabilitySchema() {
+            return Optional.absent();
+        }
+
+        @Override
+        public Collection<String> getLocation() {
+            return Collections.emptyList();
+        }
+    }
+}
diff --git a/opendaylight/netconf/netconf-notifications-impl/src/test/java/org/opendaylight/controller/netconf/notifications/impl/NetconfNotificationManagerTest.java b/opendaylight/netconf/netconf-notifications-impl/src/test/java/org/opendaylight/controller/netconf/notifications/impl/NetconfNotificationManagerTest.java
new file mode 100644 (file)
index 0000000..36d2015
--- /dev/null
@@ -0,0 +1,118 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications.impl;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.netconf.notifications.BaseNotificationPublisherRegistration;
+import org.opendaylight.controller.netconf.notifications.NetconfNotification;
+import org.opendaylight.controller.netconf.notifications.NetconfNotificationCollector;
+import org.opendaylight.controller.netconf.notifications.NetconfNotificationListener;
+import org.opendaylight.controller.netconf.notifications.NetconfNotificationRegistry;
+import org.opendaylight.controller.netconf.notifications.NotificationListenerRegistration;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChangeBuilder;
+
+public class NetconfNotificationManagerTest {
+
+    @Mock
+    private NetconfNotificationRegistry notificationRegistry;
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    @Test
+    public void testNotificationListeners() throws Exception {
+        final NetconfNotificationManager netconfNotificationManager = new NetconfNotificationManager();
+        final BaseNotificationPublisherRegistration baseNotificationPublisherRegistration =
+                netconfNotificationManager.registerBaseNotificationPublisher();
+
+        final NetconfCapabilityChangeBuilder capabilityChangedBuilder = new NetconfCapabilityChangeBuilder();
+
+        final NetconfNotificationListener listener = mock(NetconfNotificationListener.class);
+        doNothing().when(listener).onNotification(any(StreamNameType.class), any(NetconfNotification.class));
+        final NotificationListenerRegistration notificationListenerRegistration = netconfNotificationManager.registerNotificationListener(NetconfNotificationManager.BASE_NETCONF_STREAM.getName(), listener);
+        final NetconfCapabilityChange notification = capabilityChangedBuilder.build();
+        baseNotificationPublisherRegistration.onCapabilityChanged(notification);
+
+        verify(listener).onNotification(any(StreamNameType.class), any(NetconfNotification.class));
+
+        notificationListenerRegistration.close();
+
+        baseNotificationPublisherRegistration.onCapabilityChanged(notification);
+        verifyNoMoreInteractions(listener);
+    }
+
+    @Test
+    public void testClose() throws Exception {
+        final NetconfNotificationManager netconfNotificationManager = new NetconfNotificationManager();
+
+        final BaseNotificationPublisherRegistration baseNotificationPublisherRegistration = netconfNotificationManager.registerBaseNotificationPublisher();
+
+        final NetconfNotificationListener listener = mock(NetconfNotificationListener.class);
+        doNothing().when(listener).onNotification(any(StreamNameType.class), any(NetconfNotification.class));
+
+        netconfNotificationManager.registerNotificationListener(NetconfNotificationManager.BASE_NETCONF_STREAM.getName(), listener);
+
+        final NetconfNotificationCollector.NetconfNotificationStreamListener streamListener =
+                mock(NetconfNotificationCollector.NetconfNotificationStreamListener.class);
+        doNothing().when(streamListener).onStreamUnregistered(any(StreamNameType.class));
+        doNothing().when(streamListener).onStreamRegistered(any(Stream.class));
+        netconfNotificationManager.registerStreamListener(streamListener);
+
+        verify(streamListener).onStreamRegistered(NetconfNotificationManager.BASE_NETCONF_STREAM);
+
+        netconfNotificationManager.close();
+
+        verify(streamListener).onStreamUnregistered(NetconfNotificationManager.BASE_NETCONF_STREAM.getName());
+
+        try {
+            baseNotificationPublisherRegistration.onCapabilityChanged(new NetconfCapabilityChangeBuilder().build());
+        } catch (final IllegalStateException e) {
+            // Exception should be thrown after manager is closed
+            return;
+        }
+
+        fail("Publishing into a closed manager should fail");
+    }
+
+    @Test
+    public void testStreamListeners() throws Exception {
+        final NetconfNotificationManager netconfNotificationManager = new NetconfNotificationManager();
+
+        final NetconfNotificationCollector.NetconfNotificationStreamListener streamListener = mock(NetconfNotificationCollector.NetconfNotificationStreamListener.class);
+        doNothing().when(streamListener).onStreamRegistered(any(Stream.class));
+        doNothing().when(streamListener).onStreamUnregistered(any(StreamNameType.class));
+
+        netconfNotificationManager.registerStreamListener(streamListener);
+
+        final BaseNotificationPublisherRegistration baseNotificationPublisherRegistration =
+                netconfNotificationManager.registerBaseNotificationPublisher();
+
+        verify(streamListener).onStreamRegistered(NetconfNotificationManager.BASE_NETCONF_STREAM);
+
+
+        baseNotificationPublisherRegistration.close();
+
+        verify(streamListener).onStreamUnregistered(NetconfNotificationManager.BASE_STREAM_NAME);
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/netconf/netconf-notifications-impl/src/test/java/org/opendaylight/controller/netconf/notifications/impl/ops/CreateSubscriptionTest.java b/opendaylight/netconf/netconf-notifications-impl/src/test/java/org/opendaylight/controller/netconf/notifications/impl/ops/CreateSubscriptionTest.java
new file mode 100644 (file)
index 0000000..aca8f2d
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications.impl.ops;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.netconf.api.NetconfSession;
+import org.opendaylight.controller.netconf.notifications.NetconfNotificationListener;
+import org.opendaylight.controller.netconf.notifications.NetconfNotificationRegistry;
+import org.opendaylight.controller.netconf.notifications.NotificationListenerRegistration;
+import org.opendaylight.controller.netconf.util.xml.XmlElement;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
+import org.w3c.dom.Element;
+
+public class CreateSubscriptionTest {
+
+    private static final String CREATE_SUBSCRIPTION_XML = "<create-subscription\n" +
+            "xmlns=\"urn:ietf:params:xml:ns:netconf:notification:1.0\" xmlns:netconf=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
+            "<stream>TESTSTREAM</stream>" +
+            "</create-subscription>";
+
+    @Mock
+    private NetconfNotificationRegistry notificationRegistry;
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+        doReturn(true).when(notificationRegistry).isStreamAvailable(any(StreamNameType.class));
+        doReturn(mock(NotificationListenerRegistration.class)).when(notificationRegistry).registerNotificationListener(any(StreamNameType.class), any(NetconfNotificationListener.class));
+    }
+
+    @Test
+    public void testHandleWithNoSubsequentOperations() throws Exception {
+        final CreateSubscription createSubscription = new CreateSubscription("id", notificationRegistry);
+        createSubscription.setSession(mock(NetconfSession.class));
+
+        final Element e = XmlUtil.readXmlToElement(CREATE_SUBSCRIPTION_XML);
+
+        final XmlElement operationElement = XmlElement.fromDomElement(e);
+        final Element element = createSubscription.handleWithNoSubsequentOperations(XmlUtil.newDocument(), operationElement);
+
+        Assert.assertThat(XmlUtil.toString(element), CoreMatchers.containsString("ok"));
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/netconf/netconf-notifications-impl/src/test/java/org/opendaylight/controller/netconf/notifications/impl/ops/GetTest.java b/opendaylight/netconf/netconf-notifications-impl/src/test/java/org/opendaylight/controller/netconf/notifications/impl/ops/GetTest.java
new file mode 100644 (file)
index 0000000..6f38f24
--- /dev/null
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications.impl.ops;
+
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import org.custommonkey.xmlunit.Diff;
+import org.custommonkey.xmlunit.XMLUnit;
+import org.junit.Test;
+import org.opendaylight.controller.netconf.notifications.impl.ops.Get;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.StreamsBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamKey;
+import org.w3c.dom.Document;
+import org.xml.sax.SAXException;
+
+public class GetTest {
+
+    @Test
+    public void testSerializeStreamsSubtree() throws Exception {
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+        final StreamBuilder streamBuilder = new StreamBuilder();
+        final StreamNameType base = new StreamNameType("base");
+        streamBuilder.setName(base);
+        streamBuilder.setKey(new StreamKey(base));
+        streamBuilder.setDescription("description");
+        streamBuilder.setReplaySupport(false);
+        streamsBuilder.setStream(Lists.newArrayList(streamBuilder.build()));
+        final Streams streams = streamsBuilder.build();
+
+        final Document response = getBlankResponse();
+        Get.serializeStreamsSubtree(response, streams);
+        final Diff diff = XMLUnit.compareXML(XmlUtil.toString(response),
+                "<rpc-reply message-id=\"101\" xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
+                "<data>\n" +
+                "<netconf xmlns=\"urn:ietf:params:xml:ns:netmod:notification\">\n" +
+                "<streams>\n" +
+                "<stream>\n" +
+                "<name>base</name>\n" +
+                "<description>description</description>\n" +
+                "<replaySupport>false</replaySupport>\n" +
+                "</stream>\n" +
+                "</streams>\n" +
+                "</netconf>\n" +
+                "</data>\n" +
+                "</rpc-reply>\n");
+
+        assertTrue(diff.toString(), diff.identical());
+    }
+
+    private Document getBlankResponse() throws IOException, SAXException {
+
+        return XmlUtil.readXmlToDocument("<rpc-reply message-id=\"101\"\n" +
+                "xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
+                "<data>\n" +
+                "</data>\n" +
+                "</rpc-reply>");
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/netconf/netconf-notifications-impl/src/test/java/org/opendaylight/controller/netconf/notifications/impl/ops/NotificationsTransformUtilTest.java b/opendaylight/netconf/netconf-notifications-impl/src/test/java/org/opendaylight/controller/netconf/notifications/impl/ops/NotificationsTransformUtilTest.java
new file mode 100644 (file)
index 0000000..c4bc41c
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications.impl.ops;
+
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Lists;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import org.custommonkey.xmlunit.Diff;
+import org.custommonkey.xmlunit.XMLUnit;
+import org.junit.Test;
+import org.opendaylight.controller.netconf.notifications.NetconfNotification;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Uri;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChangeBuilder;
+
+public class NotificationsTransformUtilTest {
+
+    private static final Date DATE = new Date();
+    private static final String innerNotification = "<netconf-capability-change xmlns=\"urn:ietf:params:xml:ns:yang:ietf-netconf-notifications\">" +
+            "<deleted-capability>uri3</deleted-capability>" +
+            "<deleted-capability>uri4</deleted-capability>" +
+            "<added-capability>uri1</added-capability>" +
+            "</netconf-capability-change>";
+
+    private static final String expectedNotification = "<notification xmlns=\"urn:ietf:params:netconf:capability:notification:1.0\">" +
+            innerNotification +
+            "<eventTime>" + new SimpleDateFormat(NetconfNotification.RFC3339_DATE_FORMAT_BLUEPRINT).format(DATE) + "</eventTime>" +
+            "</notification>";
+
+    @Test
+    public void testTransform() throws Exception {
+        final NetconfCapabilityChangeBuilder netconfCapabilityChangeBuilder = new NetconfCapabilityChangeBuilder();
+
+        netconfCapabilityChangeBuilder.setAddedCapability(Lists.newArrayList(new Uri("uri1"), new Uri("uri1")));
+        netconfCapabilityChangeBuilder.setDeletedCapability(Lists.newArrayList(new Uri("uri3"), new Uri("uri4")));
+
+        final NetconfCapabilityChange capabilityChange = netconfCapabilityChangeBuilder.build();
+        final NetconfNotification transform = NotificationsTransformUtil.transform(capabilityChange, DATE);
+
+        final String serialized = XmlUtil.toString(transform.getDocument());
+
+        XMLUnit.setIgnoreWhitespace(true);
+        final Diff diff = XMLUnit.compareXML(expectedNotification, serialized);
+        assertTrue(diff.toString(), diff.similar());
+    }
+
+    @Test
+    public void testTransformFromDOM() throws Exception {
+        final NetconfNotification netconfNotification = new NetconfNotification(XmlUtil.readXmlToDocument(innerNotification), DATE);
+
+        XMLUnit.setIgnoreWhitespace(true);
+        final Diff diff = XMLUnit.compareXML(expectedNotification, netconfNotification.toString());
+        assertTrue(diff.toString(), diff.similar());
+    }
+
+}
\ No newline at end of file
index e1aa6ce3ed8184d979682df63281fe41311004e4..653dd70b2986bde1bcb91f058899e949478c31dd 100644 (file)
@@ -13,9 +13,6 @@
   <version>0.3.0-SNAPSHOT</version>
   <packaging>pom</packaging>
   <name>${project.artifactId}</name>
-  <prerequisites>
-    <maven>3.0.4</maven>
-  </prerequisites>
 
   <modules>
     <module>netconf-api</module>
     <module>netconf-ssh</module>
     <module>netconf-tcp</module>
     <module>netconf-monitoring</module>
+    <module>ietf-netconf</module>
     <module>ietf-netconf-monitoring</module>
+    <module>ietf-netconf-notifications</module>
     <module>ietf-netconf-monitoring-extension</module>
     <module>netconf-connector-config</module>
     <module>netconf-auth</module>
     <module>netconf-usermanager</module>
     <module>netconf-testtool</module>
+    <module>netconf-notifications-impl</module>
+    <module>netconf-notifications-api</module>
 
     <module>netconf-artifacts</module>
   </modules>
diff --git a/pom.xml b/pom.xml
index d1a2d8ed41024f76e0c58f39886b2547535a621a..1c4bc4d4d55241caf36c7c975a72d16e5b962da7 100644 (file)
--- a/pom.xml
+++ b/pom.xml
   <name>controller</name>
   <!-- Used by Sonar to set project name -->
 
-  <prerequisites>
-    <maven>3.0</maven>
-  </prerequisites>
-
   <modules>
 
     <!-- md-sal -->