Merge "Fix test case mis-spelling."
authorEd Warnicke <eaw@cisco.com>
Tue, 6 Jan 2015 23:36:01 +0000 (23:36 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 6 Jan 2015 23:36:01 +0000 (23:36 +0000)
59 files changed:
features/config/pom.xml
features/config/src/main/resources/features.xml
features/mdsal/pom.xml
features/mdsal/src/main/resources/features.xml
karaf/karaf-branding/.gitignore [moved from opendaylight/karaf-branding/.gitignore with 100% similarity]
karaf/karaf-branding/pom.xml [moved from opendaylight/karaf-branding/pom.xml with 100% similarity]
karaf/karaf-branding/src/main/resources/org/apache/karaf/branding/branding.properties [moved from opendaylight/karaf-branding/src/main/resources/org/apache/karaf/branding/branding.properties with 100% similarity]
karaf/karaf-parent/pom.xml [new file with mode: 0644]
karaf/opendaylight-karaf-empty/pom.xml [moved from opendaylight/distribution/opendaylight-karaf-empty/pom.xml with 99% similarity]
karaf/opendaylight-karaf-resources/pom.xml [moved from opendaylight/distribution/opendaylight-karaf-resources/pom.xml with 72% similarity]
karaf/opendaylight-karaf-resources/src/main/assembly/etc/org.ops4j.pax.url.mvn.cfg [new file with mode: 0644]
karaf/opendaylight-karaf-resources/src/main/resources/bin/instance [moved from opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/bin/instance with 100% similarity]
karaf/opendaylight-karaf-resources/src/main/resources/bin/instance.bat [moved from opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/bin/instance.bat with 100% similarity]
karaf/opendaylight-karaf-resources/src/main/resources/bin/karaf [moved from opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/bin/karaf with 100% similarity]
karaf/opendaylight-karaf-resources/src/main/resources/bin/karaf.bat [moved from opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/bin/karaf.bat with 100% similarity]
karaf/opendaylight-karaf-resources/src/main/resources/bin/setenv [moved from opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/bin/setenv with 100% similarity]
karaf/opendaylight-karaf-resources/src/main/resources/bin/setenv.bat [moved from opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/bin/setenv.bat with 100% similarity]
karaf/opendaylight-karaf-resources/src/main/resources/configuration/context.xml [moved from opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/configuration/context.xml with 100% similarity]
karaf/opendaylight-karaf-resources/src/main/resources/configuration/logback.xml [moved from opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/configuration/logback.xml with 100% similarity]
karaf/opendaylight-karaf-resources/src/main/resources/configuration/tomcat-logging.properties [moved from opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/configuration/tomcat-logging.properties with 100% similarity]
karaf/opendaylight-karaf-resources/src/main/resources/configuration/tomcat-server.xml [moved from opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/configuration/tomcat-server.xml with 100% similarity]
karaf/opendaylight-karaf-resources/src/main/resources/etc/custom.properties [moved from opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/etc/custom.properties with 100% similarity]
karaf/opendaylight-karaf-resources/src/main/resources/version.properties [moved from opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/version.properties with 100% similarity]
karaf/opendaylight-karaf/pom.xml [moved from opendaylight/distribution/opendaylight-karaf/pom.xml with 99% similarity]
karaf/pom.xml [new file with mode: 0644]
opendaylight/adsal/topologymanager/implementation/src/main/java/org/opendaylight/controller/topologymanager/internal/Activator.java
opendaylight/adsal/topologymanager/implementation/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java
opendaylight/adsal/topologymanager/implementation/src/test/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImplTest.java
opendaylight/config/config-parent/pom.xml [new file with mode: 0644]
opendaylight/config/pom.xml
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ClientActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleRoleChangeListener.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/common/actor/MeteredBoundedMailboxTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Bucket.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsManager.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatListenCommitFlow.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatListenCommitGroup.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatListenCommitMeter.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatListenCommitQueue.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatNodeRegistrationImpl.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatNotifyCommitPort.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatNotifyCommitTable.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatRpcMsgManagerImpl.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatisticsManagerImpl.java
pom.xml

index 461427c7cee08e1c0ff67f4cbcdf292c64585d13..1fa248615c06a0974d5d741eaca91c03c579e06f 100644 (file)
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>sal-common</artifactId>
     </dependency>
-    <dependency>
-      <groupId>org.opendaylight.controller</groupId>
-      <artifactId>sal-common-api</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.opendaylight.controller</groupId>
-      <artifactId>sal-common-impl</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.opendaylight.controller</groupId>
-      <artifactId>sal-common-util</artifactId>
-    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>config-api</artifactId>
index b4dd03f4910d6761e7415c7d657851e15de356f5..b2e0b246ef822128037a7318cfee2483fd580d4a 100644 (file)
@@ -6,21 +6,12 @@
   <repository>mvn:org.opendaylight.yangtools/features-yangtools/${yangtools.version}/xml/features</repository>
 
   <feature name='odl-config-all' version='${project.version}' description="OpenDaylight :: Config :: All">
-      <feature version='${mdsal.version}'>odl-mdsal-common</feature>
       <feature version='${project.version}'>odl-config-api</feature>
       <feature version='${project.version}'>odl-config-netty-config-api</feature>
       <feature version='${project.version}'>odl-config-core</feature>
       <feature version='${project.version}'>odl-config-manager</feature>
   </feature>
 
-  <feature name='odl-mdsal-common' version='${mdsal.version}' description="OpenDaylight :: Config :: All">
-      <feature version='${yangtools.version}'>odl-yangtools-data-binding</feature>
-      <bundle>mvn:org.opendaylight.controller/sal-common/${mdsal.version}</bundle>
-      <bundle>mvn:org.opendaylight.controller/sal-common-api/${mdsal.version}</bundle>
-      <bundle>mvn:org.opendaylight.controller/sal-common-impl/${mdsal.version}</bundle>
-      <bundle>mvn:org.opendaylight.controller/sal-common-util/${mdsal.version}</bundle>
-  </feature>
-
   <feature name='odl-config-api' version='${project.version}' description="OpenDaylight :: Config :: API">
     <bundle>mvn:org.opendaylight.controller/config-api/${project.version}</bundle>
     <feature version='${yangtools.version}'>odl-yangtools-common</feature>
@@ -39,7 +30,6 @@
     <feature version='${yangtools.version}'>odl-yangtools-common</feature>
     <feature version='${yangtools.version}'>odl-yangtools-binding</feature>
     <feature version='${yangtools.version}'>odl-yangtools-binding-generator</feature>
-    <feature version='${mdsal.version}'>odl-mdsal-common</feature>
     <feature version='${project.version}'>odl-config-api</feature>
     <bundle>mvn:org.opendaylight.controller/config-util/${project.version}</bundle>
     <bundle>mvn:org.opendaylight.controller/yang-jmx-generator/${project.version}</bundle>
@@ -53,4 +43,4 @@
     <feature version='${project.version}'>odl-config-core</feature>
     <bundle>mvn:org.opendaylight.controller/config-manager/${project.version}</bundle>
   </feature>
-</features>
\ No newline at end of file
+</features>
index d81da186b9490b4038f42fe668cca5a397dd03d3..5e6afd248f261ec5642ebd87592569bdec059054 100644 (file)
       <artifactId>sal-akka-raft</artifactId>
       <version>${mdsal.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-common-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-common-impl</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-common-util</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>sal-core-spi</artifactId>
index 540cea1bbc299ec89ea8e11f17a92ec480f602a1..1582f457897d8c2eabf20e833de3dcb36e946742 100644 (file)
         <feature version='${project.version}'>odl-mdsal-xsql</feature>
         <feature version='${project.version}'>odl-toaster</feature>
     </feature>
+    <feature name='odl-mdsal-common' version='${mdsal.version}' description="OpenDaylight :: Config :: All">
+      <feature version='${yangtools.version}'>odl-yangtools-data-binding</feature>
+      <bundle>mvn:org.opendaylight.controller/sal-common/${mdsal.version}</bundle>
+      <bundle>mvn:org.opendaylight.controller/sal-common-api/${mdsal.version}</bundle>
+      <bundle>mvn:org.opendaylight.controller/sal-common-impl/${mdsal.version}</bundle>
+      <bundle>mvn:org.opendaylight.controller/sal-common-util/${mdsal.version}</bundle>
+    </feature>
     <feature name='odl-mdsal-broker' version='${project.version}' description="OpenDaylight :: MDSAL :: Broker">
         <feature version='${yangtools.version}'>odl-yangtools-common</feature>
         <feature version='${yangtools.version}'>odl-yangtools-binding</feature>
diff --git a/karaf/karaf-parent/pom.xml b/karaf/karaf-parent/pom.xml
new file mode 100644 (file)
index 0000000..06d8c8d
--- /dev/null
@@ -0,0 +1,345 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+
+This program and the accompanying materials are made available under the
+terms of the Eclipse Public License v1.0 which accompanies this distribution,
+and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <groupId>org.opendaylight.odlparent</groupId>
+    <artifactId>odlparent</artifactId>
+    <version>1.5.0-SNAPSHOT</version>
+    <relativePath/>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>org.opendaylight.controller</groupId>
+  <artifactId>karaf-parent</artifactId>
+  <name>${project.artifactId}</name>
+  <packaging>pom</packaging>
+  <prerequisites>
+    <maven>3.1.1</maven>
+  </prerequisites>
+  <properties>
+    <branding.version>1.1.0-SNAPSHOT</branding.version>
+    <karaf.resources.version>1.5.0-SNAPSHOT</karaf.resources.version>
+  </properties>
+  <dependencyManagement>
+    <dependencies>
+      <dependency>
+        <!-- scope is compile so all features (there is only one) are installed
+        into startup.properties and the feature repo itself is not installed -->
+        <groupId>org.apache.karaf.features</groupId>
+        <artifactId>framework</artifactId>
+        <version>${karaf.version}</version>
+        <type>kar</type>
+        <exclusions>
+          <exclusion>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.core</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.sshd</groupId>
+            <artifactId>sshd-core</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
+  <dependencies>
+    <!-- ODL Branding -->
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>karaf.branding</artifactId>
+      <version>${branding.version}</version>
+      <scope>compile</scope>
+    </dependency>
+
+    <!-- Resources needed -->
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>opendaylight-karaf-resources</artifactId>
+      <version>${karaf.resources.version}</version>
+    </dependency>
+  </dependencies>
+  <build>
+    <pluginManagement>
+      <plugins>
+        <plugin>
+          <groupId>org.eclipse.m2e</groupId>
+          <artifactId>lifecycle-mapping</artifactId>
+          <version>1.0.0</version>
+          <configuration>
+            <lifecycleMappingMetadata>
+              <pluginExecutions>
+                <pluginExecution>
+                  <pluginExecutionFilter>
+                    <groupId>org.apache.felix</groupId>
+                    <artifactId>maven-bundle-plugin</artifactId>
+                    <versionRange>[0,)</versionRange>
+                    <goals>
+                      <goal>cleanVersions</goal>
+                    </goals>
+                  </pluginExecutionFilter>
+                  <action>
+                    <ignore></ignore>
+                  </action>
+                </pluginExecution>
+                <pluginExecution>
+                  <pluginExecutionFilter>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-dependency-plugin</artifactId>
+                    <versionRange>[0,)</versionRange>
+                    <goals>
+                      <goal>copy</goal>
+                      <goal>unpack</goal>
+                    </goals>
+                  </pluginExecutionFilter>
+                  <action>
+                    <ignore></ignore>
+                  </action>
+                </pluginExecution>
+                <pluginExecution>
+                  <pluginExecutionFilter>
+                    <groupId>org.apache.karaf.tooling</groupId>
+                    <artifactId>karaf-maven-plugin</artifactId>
+                    <versionRange>[0,)</versionRange>
+                    <goals>
+                      <goal>commands-generate-help</goal>
+                      <goal>features-add-to-repository</goal>
+                      <goal>install-kars</goal>
+                    </goals>
+                  </pluginExecutionFilter>
+                  <action>
+                    <ignore></ignore>
+                  </action>
+                </pluginExecution>
+                <pluginExecution>
+                  <pluginExecutionFilter>
+                    <groupId>org.fusesource.scalate</groupId>
+                    <artifactId>maven-scalate-plugin</artifactId>
+                    <versionRange>[0,)</versionRange>
+                    <goals>
+                      <goal>sitegen</goal>
+                    </goals>
+                  </pluginExecutionFilter>
+                  <action>
+                    <ignore></ignore>
+                  </action>
+                </pluginExecution>
+                <pluginExecution>
+                  <pluginExecutionFilter>
+                    <groupId>org.apache.servicemix.tooling</groupId>
+                    <artifactId>depends-maven-plugin</artifactId>
+                    <versionRange>[0,)</versionRange>
+                    <goals>
+                      <goal>generate-depends-file</goal>
+                    </goals>
+                  </pluginExecutionFilter>
+                  <action>
+                    <ignore></ignore>
+                  </action>
+                </pluginExecution>
+              </pluginExecutions>
+            </lifecycleMappingMetadata>
+          </configuration>
+        </plugin>
+
+        <plugin>
+          <artifactId>maven-resources-plugin</artifactId>
+          <executions>
+            <execution>
+              <id>copy-resources</id>
+              <!-- here the phase you need -->
+              <phase>prepare-package</phase>
+              <goals>
+                <goal>copy-resources</goal>
+              </goals>
+              <configuration>
+                <outputDirectory>${basedir}/target/assembly</outputDirectory>
+                <resources>
+                  <resource>
+                    <directory>src/main/assembly</directory>
+                  </resource>
+                </resources>
+                <overwrite>true</overwrite>
+              </configuration>
+            </execution>
+          </executions>
+        </plugin>
+        <plugin>
+          <groupId>org.apache.karaf.tooling</groupId>
+          <artifactId>karaf-maven-plugin</artifactId>
+          <version>${karaf.version}</version>
+          <extensions>true</extensions>
+          <configuration>
+            <!-- no startupFeatures -->
+            <bootFeatures>
+              <feature>standard</feature>
+              <feature>${karaf.localFeature}</feature>
+            </bootFeatures>
+            <!-- no installedFeatures -->
+          </configuration>
+          <executions>
+            <execution>
+              <id>populate-system</id>
+              <phase>generate-resources</phase>
+              <goals>
+                <goal>features-add-to-repository</goal>
+              </goals>
+              <configuration>
+                <descriptors>
+                  <descriptor>mvn:org.apache.karaf.features/standard/${karaf.version}/xml/features</descriptor>
+                </descriptors>
+                <features>
+                  <feature>standard</feature>
+                  <feature>config</feature>
+                  <feature>package</feature>
+                  <feature>kar</feature>
+                  <feature>ssh</feature>
+                  <feature>management</feature>
+                  <feature>war</feature>
+                </features>
+                <repository>target/assembly/system</repository>
+              </configuration>
+            </execution>
+            <execution>
+              <id>process-resources</id>
+              <goals>
+                <goal>install-kars</goal>
+              </goals>
+              <phase>process-resources</phase>
+            </execution>
+            <execution>
+              <id>package</id>
+              <goals>
+                <goal>instance-create-archive</goal>
+              </goals>
+            </execution>
+          </executions>
+        </plugin>
+        <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-checkstyle-plugin</artifactId>
+          <version>${checkstyle.version}</version>
+          <configuration>
+            <excludes>**\/target\/,**\/bin\/,**\/target-ide\/,**\/configuration\/initial\/</excludes>
+          </configuration>
+        </plugin>
+        <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-dependency-plugin</artifactId>
+          <executions>
+            <execution>
+              <id>copy</id>
+              <goals>
+                <goal>copy</goal>
+              </goals>
+              <!-- here the phase you need -->
+              <phase>generate-resources</phase>
+              <configuration>
+                <artifactItems>
+                  <artifactItem>
+                    <groupId>org.opendaylight.controller</groupId>
+                    <artifactId>karaf.branding</artifactId>
+                    <version>${karaf.branding.version}</version>
+                    <outputDirectory>target/assembly/lib</outputDirectory>
+                    <destFileName>karaf.branding-${branding.version}.jar</destFileName>
+                  </artifactItem>
+                </artifactItems>
+              </configuration>
+            </execution>
+            <execution>
+              <id>unpack-karaf-resources</id>
+              <goals>
+                <goal>unpack-dependencies</goal>
+              </goals>
+              <phase>prepare-package</phase>
+              <configuration>
+                <outputDirectory>${project.build.directory}/assembly</outputDirectory>
+                <groupId>org.opendaylight.controller</groupId>
+                <includeArtifactIds>opendaylight-karaf-resources</includeArtifactIds>
+                <excludes>META-INF\/**</excludes>
+                <excludeTransitive>true</excludeTransitive>
+                <ignorePermissions>false</ignorePermissions>
+              </configuration>
+            </execution>
+            <execution>
+              <id>org.ops4j.pax.url.mvn.cfg</id>
+              <goals>
+                <goal>copy</goal>
+              </goals>
+              <phase>prepare-package</phase>
+              <configuration>
+                <artifactItems>
+                  <artifactItem>
+                    <groupId>org.opendaylight.controller</groupId>
+                    <artifactId>opendaylight-karaf-resources</artifactId>
+                    <type>properties</type>
+                    <classifier>config</classifier>
+                    <overWrite>true</overWrite>
+                    <outputDirectory>${project.build.directory}/assembly/etc/</outputDirectory>
+                    <destFileName>org.ops4j.pax.url.mvn.cfg</destFileName>
+                  </artifactItem>
+                </artifactItems>
+                <overWriteReleases>true</overWriteReleases>
+                <overWriteSnapshots>true</overWriteSnapshots>
+              </configuration>
+            </execution>
+          </executions>
+        </plugin>
+        <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-antrun-plugin</artifactId>
+          <executions>
+            <execution>
+              <phase>prepare-package</phase>
+              <goals>
+                <goal>run</goal>
+              </goals>
+              <configuration>
+                <tasks>
+                  <chmod perm="755">
+                    <fileset dir="${project.build.directory}/assembly/bin">
+                      <include name="karaf"/>
+                      <include name="instance"/>
+                      <include name="start"/>
+                      <include name="stop"/>
+                      <include name="status"/>
+                      <include name="client"/>
+                      <include name="shell"/>
+                    </fileset>
+                  </chmod>
+                </tasks>
+              </configuration>
+            </execution>
+          </executions>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+    <plugins>
+      <plugin>
+        <artifactId>maven-resources-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.karaf.tooling</groupId>
+        <artifactId>karaf-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-antrun-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
similarity index 99%
rename from opendaylight/distribution/opendaylight-karaf-empty/pom.xml
rename to karaf/opendaylight-karaf-empty/pom.xml
index a66a502a70e540b237799cc348da0b8fe0ef67ed..a13023cbeebdc74a47a7741933202075deef4b1a 100644 (file)
@@ -5,7 +5,7 @@
     <groupId>org.opendaylight.controller</groupId>
     <artifactId>commons.opendaylight</artifactId>
     <version>1.5.0-SNAPSHOT</version>
-    <relativePath>../../commons/opendaylight</relativePath>
+    <relativePath>../../opendaylight/commons/opendaylight</relativePath>
   </parent>
   <artifactId>opendaylight-karaf-empty</artifactId>
   <packaging>pom</packaging>
similarity index 72%
rename from opendaylight/distribution/opendaylight-karaf-resources/pom.xml
rename to karaf/opendaylight-karaf-resources/pom.xml
index 35aac0933945debbc8fb02806e29be4620e37eb3..0b2b8eff1ffe8e2e85deeddb84dafe066e8b8ce7 100644 (file)
@@ -13,7 +13,7 @@
     <groupId>org.opendaylight.controller</groupId>
     <artifactId>commons.opendaylight</artifactId>
     <version>1.5.0-SNAPSHOT</version>
-    <relativePath>../../commons/opendaylight</relativePath>
+    <relativePath>../../opendaylight/commons/opendaylight</relativePath>
   </parent>
   <artifactId>opendaylight-karaf-resources</artifactId>
   <description>Resources for opendaylight-karaf</description>
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>attach-artifacts</id>
+            <goals>
+              <goal>attach-artifact</goal>
+            </goals>
+            <phase>package</phase>
+            <configuration>
+              <artifacts>
+                <artifact>
+                  <file>src/main/assembly/etc/org.ops4j.pax.url.mvn.cfg</file>
+                  <type>properties</type>
+                  <classifier>config</classifier>
+                </artifact>
+              </artifacts>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 </project>
diff --git a/karaf/opendaylight-karaf-resources/src/main/assembly/etc/org.ops4j.pax.url.mvn.cfg b/karaf/opendaylight-karaf-resources/src/main/assembly/etc/org.ops4j.pax.url.mvn.cfg
new file mode 100644 (file)
index 0000000..9ee45e4
--- /dev/null
@@ -0,0 +1,106 @@
+################################################################################
+#
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+#
+################################################################################
+
+#
+# If set to true, the following property will not allow any certificate to be used
+# when accessing Maven repositories through SSL
+#
+#org.ops4j.pax.url.mvn.certificateCheck=
+
+#
+# Path to the local Maven settings file.
+# The repositories defined in this file will be automatically added to the list
+# of default repositories if the 'org.ops4j.pax.url.mvn.repositories' property
+# below is not set.
+# The following locations are checked for the existence of the settings.xml file
+#   * 1. looks for the specified url
+#   * 2. if not found looks for ${user.home}/.m2/settings.xml
+#   * 3. if not found looks for ${maven.home}/conf/settings.xml
+#   * 4. if not found looks for ${M2_HOME}/conf/settings.xml
+#
+#org.ops4j.pax.url.mvn.settings=
+
+#
+# Path to the local Maven repository which is used to avoid downloading
+# artifacts when they already exist locally.
+# The value of this property will be extracted from the settings.xml file
+# above, or defaulted to:
+#     System.getProperty( "user.home" ) + "/.m2/repository"
+#
+org.ops4j.pax.url.mvn.localRepository=${karaf.home}/${karaf.default.repository}
+
+#
+# Default this to false. It's just weird to use undocumented repos
+#
+org.ops4j.pax.url.mvn.useFallbackRepositories=false
+
+#
+# Uncomment if you don't wanna use the proxy settings
+# from the Maven conf/settings.xml file
+#
+# org.ops4j.pax.url.mvn.proxySupport=false
+
+#
+# Disable aether support by default.  This ensure that the defaultRepositories
+# below will be used
+#
+#org.ops4j.pax.url.mvn.disableAether=true
+
+#
+# Comma separated list of repositories scanned when resolving an artifact.
+# Those repositories will be checked before iterating through the
+#    below list of repositories and even before the local repository
+# A repository url can be appended with zero or more of the following flags:
+#    @snapshots  : the repository contains snaphots
+#    @noreleases : the repository does not contain any released artifacts
+#
+# The following property value will add the system folder as a repo.
+#
+#org.ops4j.pax.url.mvn.defaultRepositories=
+
+# Use the default local repo (e.g.~/.m2/repository) as a "remote" repo
+org.ops4j.pax.url.mvn.defaultLocalRepoAsRemote=false
+
+#
+# Comma separated list of repositories scanned when resolving an artifact.
+# The default list includes the following repositories containing releases:
+#    http://repo1.maven.org/maven2
+#    http://repository.apache.org/content/groups/snapshots-group
+#    http://svn.apache.org/repos/asf/servicemix/m2-repo
+#    http://repository.springsource.com/maven/bundles/release
+#    http://repository.springsource.com/maven/bundles/external
+# To add repositories to the default ones, prepend '+' to the list of repositories
+# to add.
+# A repository url can be appended with zero or more of the following flags:
+#    @snapshots  : the repository contains snaphots
+#    @noreleases : the repository does not contain any released artifacts
+#    @id=reponid : the id for the repository, just like in the settings.xml this is optional but recomendet 
+#
+# The default list doesn't contain any repository containing snapshots as it can impact the artifacts resolution.
+# You may want to add the following repositories containing snapshots:
+#    http://repository.apache.org/content/groups/snapshots-group@id=apache@snapshots@noreleases
+#    http://oss.sonatype.org/content/repositories/snapshots@id=sonatype.snapshots.deploy@snapshots@norelease
+#    http://oss.sonatype.org/content/repositories/ops4j-snapshots@id=ops4j.sonatype.snapshots.deploy@snapshots@noreleases
+#
+org.ops4j.pax.url.mvn.repositories= \
+       file:${karaf.home}/${karaf.default.repository}@id=system.repository, \
+    file:${karaf.data}/kar@id=kar.repository@multi, \
+    http://repo1.maven.org/maven2@id=central, \
+    http://repository.springsource.com/maven/bundles/release@id=spring.ebr.release, \
+    http://repository.springsource.com/maven/bundles/external@id=spring.ebr.external
similarity index 99%
rename from opendaylight/distribution/opendaylight-karaf/pom.xml
rename to karaf/opendaylight-karaf/pom.xml
index 4c8f9c5913d65590a513effda7f4efe64ef2c00e..e0ea4c0edffb3758a87e5d33b0576fbd7a58b54b 100644 (file)
@@ -5,7 +5,7 @@
     <groupId>org.opendaylight.controller</groupId>
     <artifactId>commons.opendaylight</artifactId>
     <version>1.5.0-SNAPSHOT</version>
-    <relativePath>../../commons/opendaylight</relativePath>
+    <relativePath>../../opendaylight/commons/opendaylight</relativePath>
   </parent>
   <artifactId>distribution.opendaylight-karaf</artifactId>
   <packaging>pom</packaging>
diff --git a/karaf/pom.xml b/karaf/pom.xml
new file mode 100644 (file)
index 0000000..1d37ae1
--- /dev/null
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+
+This program and the accompanying materials are made available under the
+terms of the Eclipse Public License v1.0 which accompanies this distribution,
+and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.opendaylight.controller</groupId>
+    <artifactId>commons.parent</artifactId>
+    <version>1.1.0-SNAPSHOT</version>
+    <relativePath>../opendaylight/commons/parent</relativePath>
+  </parent>
+
+  <artifactId>karaf-aggregator</artifactId>
+  <packaging>pom</packaging>
+  <modules>
+    <module>karaf-branding</module>
+    <module>karaf-parent</module>
+    <module>opendaylight-karaf</module>
+    <module>opendaylight-karaf-empty</module>
+    <module>opendaylight-karaf-resources</module>
+  </modules>
+</project>
index 80d7083ec0dee0059a8b7c0c33720522b65fb4bc..8c422a52ea783633d386de7ea942fbbdbf4afb38 100644 (file)
@@ -1,4 +1,3 @@
-
 /*
  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
  *
@@ -22,6 +21,7 @@ import org.opendaylight.controller.configuration.IConfigurationContainerService;
 import org.opendaylight.controller.sal.core.ComponentActivatorAbstractBase;
 import org.opendaylight.controller.sal.topology.IListenTopoUpdates;
 import org.opendaylight.controller.sal.topology.ITopologyService;
+import org.opendaylight.controller.switchmanager.IInventoryListener;
 import org.opendaylight.controller.switchmanager.ISwitchManager;
 import org.opendaylight.controller.topologymanager.ITopologyManager;
 import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
@@ -73,6 +73,7 @@ public class Activator extends ComponentActivatorAbstractBase {
             props.put("cachenames", propSet);
 
             c.setInterface(new String[] { IListenTopoUpdates.class.getName(),
+                    IInventoryListener.class.getName(),
                     ITopologyManager.class.getName(),
                     ITopologyManagerShell.class.getName(),
                     IConfigurationContainerAware.class.getName(),
index 659ee7dd81ca83d91c013ceddb7017edca9a8b1b..e1a0ca1e7686c39a8e7563e6f50cb81254fe8d99 100644 (file)
@@ -34,6 +34,7 @@ import org.opendaylight.controller.sal.utils.IObjectReader;
 import org.opendaylight.controller.sal.utils.NodeConnectorCreator;
 import org.opendaylight.controller.sal.utils.Status;
 import org.opendaylight.controller.sal.utils.StatusCode;
+import org.opendaylight.controller.switchmanager.IInventoryListener;
 import org.opendaylight.controller.switchmanager.ISwitchManager;
 import org.opendaylight.controller.topologymanager.ITopologyManager;
 import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
@@ -58,6 +59,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -76,6 +79,7 @@ public class TopologyManagerImpl implements
         IConfigurationContainerAware,
         IListenTopoUpdates,
         IObjectReader,
+        IInventoryListener,
         CommandProvider {
     protected static final String TOPOEDGESDB = "topologymanager.edgesDB";
     protected static final String TOPOHOSTSDB = "topologymanager.hostsDB";
@@ -83,6 +87,8 @@ public class TopologyManagerImpl implements
     protected static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB";
     private static final String USER_LINKS_FILE_NAME = "userTopology.conf";
     private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class);
+    private static final long PENDING_UPDATE_TIMEOUT = 5000L;
+
     private ITopologyService topoService;
     private IClusterContainerServices clusterContainerService;
     private IConfigurationContainerService configurationService;
@@ -104,7 +110,79 @@ public class TopologyManagerImpl implements
     private BlockingQueue<TopoEdgeUpdate> notifyQ = new LinkedBlockingQueue<TopoEdgeUpdate>();
     private volatile Boolean shuttingDown = false;
     private Thread notifyThread;
+    private final Map<NodeConnector, List<PendingUpdateTask>> pendingUpdates =
+        new HashMap<NodeConnector, List<PendingUpdateTask>>();
+    private final BlockingQueue<TopoEdgeUpdate> updateQ =
+        new LinkedBlockingQueue<TopoEdgeUpdate>();
+    private Timer pendingTimer;
+    private Thread updateThread;
+
+    private class PendingEdgeUpdate extends TopoEdgeUpdate {
+        private PendingEdgeUpdate(Edge e, Set<Property> p, UpdateType t) {
+            super(e, p, t);
+        }
+    }
+
+    private class UpdateTopology implements Runnable {
+        @Override
+        public void run() {
+            log.trace("Start topology update thread");
+
+            while (!shuttingDown) {
+                try {
+                    List<TopoEdgeUpdate> list = new ArrayList<TopoEdgeUpdate>();
+                    TopoEdgeUpdate teu = updateQ.take();
+                    for (; teu != null; teu = updateQ.poll()) {
+                        list.add(teu);
+                    }
+
+                    if (!list.isEmpty()) {
+                        log.trace("Update edges: {}", list);
+                        doEdgeUpdate(list);
+                    }
+                } catch (InterruptedException e) {
+                    if (shuttingDown) {
+                        break;
+                    }
+                    log.warn("Topology update thread interrupted", e);
+                } catch (Exception e) {
+                    log.error("Exception on topology update thread", e);
+                }
+            }
+
+            log.trace("Exit topology update thread");
+        }
+    }
+
+    private class PendingUpdateTask extends TimerTask {
+        private final Edge  edge;
+        private final Set<Property>  props;
+        private final UpdateType  type;
+
+        private PendingUpdateTask(Edge e, Set<Property> p, UpdateType t) {
+            edge = e;
+            props = p;
+            type = t;
+        }
+
+        private NodeConnector getHeadNodeConnector() {
+            return edge.getHeadNodeConnector();
+        }
+
+        private void flush() {
+            log.info("Flush pending topology update: edge {}, type {}",
+                     edge, type);
+            updateQ.add(new PendingEdgeUpdate(edge, props, type));
+        }
 
+        @Override
+        public void run() {
+            if (removePendingEvent(this)) {
+                log.warn("Pending topology update timed out: edge{}, type {}",
+                         edge, type);
+            }
+        }
+    }
 
     void nonClusterObjectCreate() {
         edgesDB = new ConcurrentHashMap<Edge, Set<Property>>();
@@ -210,6 +288,8 @@ public class TopologyManagerImpl implements
         // Restore the shuttingDown status on init of the component
         shuttingDown = false;
         notifyThread = new Thread(new TopologyNotify(notifyQ));
+        pendingTimer = new Timer("Topology Pending Update Timer");
+        updateThread = new Thread(new UpdateTopology(), "Topology Update");
     }
 
     @SuppressWarnings({ "unchecked" })
@@ -277,6 +357,8 @@ public class TopologyManagerImpl implements
      *
      */
     void started() {
+        updateThread.start();
+
         // Start the batcher thread for the cluster wide topology updates
         notifyThread.start();
         // SollicitRefresh MUST be called here else if called at init
@@ -287,7 +369,9 @@ public class TopologyManagerImpl implements
 
     void stop() {
         shuttingDown = true;
+        updateThread.interrupt();
         notifyThread.interrupt();
+        pendingTimer.cancel();
     }
 
     /**
@@ -297,6 +381,9 @@ public class TopologyManagerImpl implements
      *
      */
     void destroy() {
+        updateQ.clear();
+        updateThread = null;
+        pendingTimer = null;
         notifyQ.clear();
         notifyThread = null;
     }
@@ -571,17 +658,100 @@ public class TopologyManagerImpl implements
         return (switchManager.doesNodeConnectorExist(head));
     }
 
+    private void addPendingEvent(Edge e, Set<Property> p, UpdateType t) {
+        NodeConnector head = e.getHeadNodeConnector();
+        PendingUpdateTask task = new PendingUpdateTask(e, p, t);
+        synchronized (pendingUpdates) {
+            List<PendingUpdateTask> list = pendingUpdates.get(head);
+            if (list == null) {
+                list = new LinkedList<PendingUpdateTask>();
+                pendingUpdates.put(head, list);
+            }
+            list.add(task);
+            pendingTimer.schedule(task, PENDING_UPDATE_TIMEOUT);
+        }
+    }
+
+    private boolean enqueueEventIfPending(Edge e, Set<Property> p, UpdateType t) {
+        NodeConnector head = e.getHeadNodeConnector();
+        synchronized (pendingUpdates) {
+            List<PendingUpdateTask> list = pendingUpdates.get(head);
+            if (list != null) {
+                log.warn("Enqueue edge update: edge {}, type {}", e, t);
+                PendingUpdateTask task = new PendingUpdateTask(e, p, t);
+                list.add(task);
+                pendingTimer.schedule(task, PENDING_UPDATE_TIMEOUT);
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private boolean removePendingEvent(PendingUpdateTask t) {
+        t.cancel();
+        NodeConnector head = t.getHeadNodeConnector();
+        boolean removed = false;
+
+        synchronized (pendingUpdates) {
+            List<PendingUpdateTask> list = pendingUpdates.get(head);
+            if (list != null) {
+                removed = list.remove(t);
+                if (list.isEmpty()) {
+                    pendingUpdates.remove(head);
+                }
+            }
+        }
+
+        return removed;
+    }
+
+    private void removePendingEvent(NodeConnector head, boolean doFlush) {
+        List<PendingUpdateTask> list;
+        synchronized (pendingUpdates) {
+            list = pendingUpdates.remove(head);
+        }
+
+        if (list != null) {
+            for (PendingUpdateTask task : list) {
+                if (task.cancel() && doFlush) {
+                    task.flush();
+                }
+            }
+            pendingTimer.purge();
+        }
+    }
+
     private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props) {
-        switch (type) {
-        case ADDED:
+        return edgeUpdate(e, type, props, false);
+    }
 
+    private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props, boolean isPending) {
+        if (!type.equals(UpdateType.ADDED) &&
+            enqueueEventIfPending(e, props, type)) {
+            return null;
+        }
 
+        switch (type) {
+        case ADDED:
             if (this.edgesDB.containsKey(e)) {
                 // Avoid redundant updates (e.g. cluster switch-over) as notifications trigger expensive tasks
                 log.trace("Skipping redundant edge addition: {}", e);
                 return null;
             }
 
+            // Ensure that head node connector exists
+            if (!isPending) {
+                if (headNodeConnectorExist(e)) {
+                    removePendingEvent(e.getHeadNodeConnector(), true);
+                } else {
+                    log.warn("Ignore edge that contains invalid node connector: {}",
+                             e);
+                    addPendingEvent(e, props, type);
+                    return null;
+                }
+            }
+
             // Make sure the props are non-null or create a copy
             if (props == null) {
                 props = new HashSet<Property>();
@@ -589,13 +759,6 @@ public class TopologyManagerImpl implements
                 props = new HashSet<Property>(props);
             }
 
-
-            // Ensure that head node connector exists
-            if (!headNodeConnectorExist(e)) {
-                log.warn("Ignore edge that contains invalid node connector: {}", e);
-                return null;
-            }
-
             // Check if nodeConnectors of the edge were correctly categorized
             // by protocol plugin
             crossCheckNodeConnectors(e);
@@ -702,16 +865,16 @@ public class TopologyManagerImpl implements
         return new TopoEdgeUpdate(e, props, type);
     }
 
-    @Override
-    public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
+    private void doEdgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
         List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
-        for (int i = 0; i < topoedgeupdateList.size(); i++) {
-            Edge e = topoedgeupdateList.get(i).getEdge();
-            Set<Property> p = topoedgeupdateList.get(i).getProperty();
-            UpdateType type = topoedgeupdateList.get(i).getUpdateType();
-            TopoEdgeUpdate teu = edgeUpdate(e, type, p);
-            if (teu != null) {
-                teuList.add(teu);
+        for (TopoEdgeUpdate teu : topoedgeupdateList) {
+            boolean isPending = (teu instanceof PendingEdgeUpdate);
+            Edge e = teu.getEdge();
+            Set<Property> p = teu.getProperty();
+            UpdateType type = teu.getUpdateType();
+            TopoEdgeUpdate update = edgeUpdate(e, type, p, isPending);
+            if (update != null) {
+                teuList.add(update);
             }
         }
 
@@ -727,6 +890,11 @@ public class TopologyManagerImpl implements
         }
     }
 
+    @Override
+    public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
+        updateQ.addAll(topoedgeupdateList);
+    }
+
     private Edge getReverseLinkTuple(TopologyUserLinkConfig link) {
         TopologyUserLinkConfig rLink = new TopologyUserLinkConfig(
                 link.getName(), link.getDstNodeConnector(), link.getSrcNodeConnector());
@@ -934,6 +1102,19 @@ public class TopologyManagerImpl implements
         notifyQ.add(upd);
     }
 
+    @Override
+    public void notifyNode(Node node, UpdateType type, Map<String, Property> propMap) {
+        // NOP
+    }
+
+    @Override
+    public void notifyNodeConnector(NodeConnector nc, UpdateType type, Map<String, Property> propMap) {
+        // Remove pending edge updates for the given node connector.
+        // Pending events should be notified if the node connector exists.
+        boolean doFlush = !type.equals(UpdateType.REMOVED);
+        removePendingEvent(nc, doFlush);
+    }
+
     @Override
     public void entryCreated(final Object key, final String cacheName, final boolean originLocal) {
         if (cacheName.equals(TOPOEDGESDB)) {
@@ -1094,4 +1275,35 @@ public class TopologyManagerImpl implements
         return result;
     }
 
+    // Only for unit test.
+    void startTest() {
+        pendingTimer = new Timer("Topology Pending Update Timer");
+        updateThread = new Thread(new UpdateTopology(), "Topology Update");
+        updateThread.start();
+    }
+
+    void stopTest() {
+        shuttingDown = true;
+        updateThread.interrupt();
+        pendingTimer.cancel();
+    }
+
+    boolean flushUpdateQueue(long timeout) {
+        long limit = System.currentTimeMillis() + timeout;
+        long cur;
+        do {
+            if (updateQ.peek() == null) {
+                return true;
+            }
+
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                break;
+            }
+            cur = System.currentTimeMillis();
+        } while (cur < limit);
+
+        return false;
+    }
 }
index d1338bf6953909aff8ff1c4bea274001f9135e5c..600f1d8cbfafb05162be7cba530e6b4c05505d3a 100644 (file)
@@ -9,6 +9,8 @@
 package org.opendaylight.controller.topologymanager.internal;
 
 import org.junit.Assert;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.sal.core.Bandwidth;
 import org.opendaylight.controller.sal.core.ConstructionException;
@@ -50,6 +52,8 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 
 public class TopologyManagerImplTest {
+    private TopologyManagerImpl topoManagerImpl;
+
     /**
      * Mockup of switch manager that only maintains existence of node
      * connector.
@@ -78,6 +82,11 @@ public class TopologyManagerImplTest {
             }
         }
 
+        private void clear() {
+            nodeSet.clear();
+            nodeConnectorSet.clear();
+        }
+
         @Override
         public Status addSubnet(SubnetConfig configObject) {
             return null;
@@ -325,6 +334,20 @@ public class TopologyManagerImplTest {
         }
     }
 
+    @Before
+    public void setUp() {
+        topoManagerImpl = new TopologyManagerImpl();
+        topoManagerImpl.startTest();
+    }
+
+    @After
+    public void tearDown() {
+        if (topoManagerImpl != null) {
+            topoManagerImpl.stopTest();
+            topoManagerImpl = null;
+        }
+    }
+
     /*
      * Sets the node, edges and properties for edges here: Edge <SwitchId :
      * NodeConnectorId> : <1:1>--><11:11>; <1:2>--><11:12>; <3:3>--><13:13>;
@@ -375,11 +398,12 @@ public class TopologyManagerImplTest {
             topoedgeupdateList.add(teu2);
             topoManagerImpl.edgeUpdate(topoedgeupdateList);
         }
+
+        Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
     }
 
     @Test
     public void testGetNodeEdges() throws ConstructionException {
-        TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
         TestSwitchManager swMgr = new TestSwitchManager();
         topoManagerImpl.setSwitchManager(swMgr);
         setNodeEdges(topoManagerImpl, swMgr);
@@ -412,7 +436,6 @@ public class TopologyManagerImplTest {
 
     @Test
     public void testGetEdges() throws ConstructionException {
-        TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
         TestSwitchManager swMgr = new TestSwitchManager();
         topoManagerImpl.setSwitchManager(swMgr);
         setNodeEdges(topoManagerImpl, swMgr);
@@ -496,7 +519,6 @@ public class TopologyManagerImplTest {
         TopologyUserLinkConfig link4 = new TopologyUserLinkConfig("default20",
                 "OF|10@OF|20", "OF|10@OF|30");
 
-        TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
         TestSwitchManager swMgr = new TestSwitchManager();
         topoManagerImpl.setSwitchManager(swMgr);
         topoManagerImpl.nonClusterObjectCreate();
@@ -529,7 +551,6 @@ public class TopologyManagerImplTest {
     public void testGetUserLink() {
         TopologyUserLinkConfig[] link = new TopologyUserLinkConfig[5];
         TopologyUserLinkConfig[] reverseLink = new TopologyUserLinkConfig[5];
-        TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
         TestSwitchManager swMgr = new TestSwitchManager();
         topoManagerImpl.setSwitchManager(swMgr);
         topoManagerImpl.nonClusterObjectCreate();
@@ -614,7 +635,6 @@ public class TopologyManagerImplTest {
     @Test
     public void testHostLinkMethods() throws ConstructionException,
     UnknownHostException {
-        TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
         TestSwitchManager swMgr = new TestSwitchManager();
         topoManagerImpl.setSwitchManager(swMgr);
         topoManagerImpl.nonClusterObjectCreate();
@@ -678,7 +698,6 @@ public class TopologyManagerImplTest {
     @Test
     public void testGetNodesWithNodeConnectorHost()
             throws ConstructionException, UnknownHostException {
-        TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
         TestSwitchManager swMgr = new TestSwitchManager();
         topoManagerImpl.setSwitchManager(swMgr);
         topoManagerImpl.nonClusterObjectCreate();
@@ -738,7 +757,6 @@ public class TopologyManagerImplTest {
 
     @Test
     public void bug1348FixTest() throws ConstructionException {
-        TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
         TestSwitchManager swMgr = new TestSwitchManager();
         topoManagerImpl.setSwitchManager(swMgr);
         topoManagerImpl.nonClusterObjectCreate();
@@ -763,7 +781,91 @@ public class TopologyManagerImplTest {
             Assert.fail("Exception was raised when trying to update edge properties: " + e.getMessage());
         }
 
+        Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
         Assert.assertEquals(1, topoManagerImpl.getEdges().size());
         Assert.assertNotNull(topoManagerImpl.getEdges().get(edge));
     }
+
+    @Test
+    public void testNotifyNodeConnector() throws ConstructionException {
+        TestSwitchManager swMgr = new TestSwitchManager();
+        topoManagerImpl.setSwitchManager(swMgr);
+        topoManagerImpl.nonClusterObjectCreate();
+
+        // Test NodeConnector notification in the case that there are no
+        // related edge updates.
+        NodeConnector nc1 = NodeConnectorCreator.createOFNodeConnector(
+                (short) 1, NodeCreator.createOFNode(1000L));
+        Map<String, Property> propMap = new HashMap<>();
+        swMgr.addNodeConnectors(nc1);
+        topoManagerImpl.notifyNodeConnector(nc1, UpdateType.ADDED, propMap);
+        Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+
+        topoManagerImpl.notifyNodeConnector(nc1, UpdateType.CHANGED, propMap);
+        Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+
+        swMgr.clear();
+        topoManagerImpl.notifyNodeConnector(nc1, UpdateType.REMOVED, propMap);
+        Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+
+        // Test NodeConnector notification in the case that there is a related
+        // edge update just before the notification.
+        NodeConnector nc2 = NodeConnectorCreator.createOFNodeConnector(
+                (short) 2, NodeCreator.createOFNode(2000L));
+        Edge edge1 = new Edge(nc1, nc2);
+        Edge edge2 = new Edge(nc2, nc1);
+        Set<Property> props = new HashSet<Property>();
+        TopoEdgeUpdate teu1 = new TopoEdgeUpdate(edge1, props, UpdateType.ADDED);
+        TopoEdgeUpdate teu2 = new TopoEdgeUpdate(edge2, props, UpdateType.ADDED);
+        List<TopoEdgeUpdate> topoedgeupdateList = new ArrayList<TopoEdgeUpdate>();
+        topoedgeupdateList.add(teu1);
+        topoedgeupdateList.add(teu2);
+        topoManagerImpl.edgeUpdate(topoedgeupdateList);
+        swMgr.addNodeConnectors(nc1);
+        topoManagerImpl.notifyNodeConnector(nc1, UpdateType.ADDED, propMap);
+        swMgr.addNodeConnectors(nc2);
+        topoManagerImpl.notifyNodeConnector(nc2, UpdateType.CHANGED, propMap);
+        Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
+        Assert.assertEquals(2, topoManagerImpl.getEdges().size());
+
+        teu1 = new TopoEdgeUpdate(edge1, props, UpdateType.REMOVED);
+        teu2 = new TopoEdgeUpdate(edge2, props, UpdateType.REMOVED);
+        topoedgeupdateList = new ArrayList<TopoEdgeUpdate>();
+        topoedgeupdateList.add(teu1);
+        topoedgeupdateList.add(teu2);
+        topoManagerImpl.edgeUpdate(topoedgeupdateList);
+        Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
+        Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+        topoManagerImpl.notifyNodeConnector(nc1, UpdateType.REMOVED, propMap);
+        topoManagerImpl.notifyNodeConnector(nc2, UpdateType.REMOVED, propMap);
+
+        swMgr.clear();
+
+        // Test NodeConnector notification in the case that there are multiple
+        // edge updates related to the NodeConnector just before the notification.
+        teu1 = new TopoEdgeUpdate(edge1, props, UpdateType.ADDED);
+        teu2 = new TopoEdgeUpdate(edge2, props, UpdateType.ADDED);
+        TopoEdgeUpdate teu3 = new TopoEdgeUpdate(edge1, props, UpdateType.CHANGED);
+        TopoEdgeUpdate teu4 = new TopoEdgeUpdate(edge2, props, UpdateType.CHANGED);
+        TopoEdgeUpdate teu5 = new TopoEdgeUpdate(edge1, props, UpdateType.REMOVED);
+        TopoEdgeUpdate teu6 = new TopoEdgeUpdate(edge2, props, UpdateType.REMOVED);
+        topoedgeupdateList = new ArrayList<TopoEdgeUpdate>();
+        topoedgeupdateList.add(teu1);
+        topoedgeupdateList.add(teu2);
+        topoedgeupdateList.add(teu3);
+        topoedgeupdateList.add(teu4);
+        topoedgeupdateList.add(teu5);
+        topoedgeupdateList.add(teu6);
+        topoManagerImpl.edgeUpdate(topoedgeupdateList);
+        swMgr.addNodeConnectors(nc1);
+        topoManagerImpl.notifyNodeConnector(nc1, UpdateType.ADDED, propMap);
+        swMgr.addNodeConnectors(nc2);
+        topoManagerImpl.notifyNodeConnector(nc2, UpdateType.CHANGED, propMap);
+        Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
+        Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+        topoManagerImpl.notifyNodeConnector(nc1, UpdateType.REMOVED, propMap);
+        topoManagerImpl.notifyNodeConnector(nc2, UpdateType.REMOVED, propMap);
+        Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
+        Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+    }
 }
diff --git a/opendaylight/config/config-parent/pom.xml b/opendaylight/config/config-parent/pom.xml
new file mode 100644 (file)
index 0000000..0b2b634
--- /dev/null
@@ -0,0 +1,154 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+
+This program and the accompanying materials are made available under the
+terms of the Eclipse Public License v1.0 which accompanies this distribution,
+and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <groupId>org.opendaylight.yangtools</groupId>
+    <artifactId>binding-parent</artifactId>
+    <version>0.7.0-SNAPSHOT</version>
+    <relativePath/>
+  </parent>
+
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>org.opendaylight.controller</groupId>
+  <artifactId>config-parent</artifactId>
+  <version>0.3.0-SNAPSHOT</version>
+  <packaging>pom</packaging>
+
+  <properties>
+    <config.version>0.3.0-SNAPSHOT</config.version>
+    <mdsal.version>1.2.0-SNAPSHOT</mdsal.version>
+    <jmxGeneratorPath>src/main/yang-gen-config</jmxGeneratorPath>
+    <config.file>src/main/config/default-config.xml</config.file>
+  </properties>
+
+  <dependencyManagement>
+    <dependencies>
+      <!-- project specific dependencies -->
+      <dependency>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>config-artifacts</artifactId>
+        <version>${config.version}</version>
+        <type>pom</type>
+        <scope>import</scope>
+      </dependency>
+      <dependency>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>mdsal-artifacts</artifactId>
+        <version>${mdsal.version}</version>
+        <type>pom</type>
+        <scope>import</scope>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>config-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-binding-config</artifactId>
+    </dependency>
+  </dependencies>
+  <build>
+    <pluginManagement>
+      <plugins>
+        <plugin>
+          <groupId>org.opendaylight.yangtools</groupId>
+          <artifactId>yang-maven-plugin</artifactId>
+          <dependencies>
+            <dependency>
+              <groupId>org.opendaylight.controller</groupId>
+              <artifactId>yang-jmx-generator-plugin</artifactId>
+              <version>${config.version}</version>
+            </dependency>
+          </dependencies>
+          <executions>
+            <execution>
+              <id>config</id>
+              <goals>
+                <goal>generate-sources</goal>
+              </goals>
+              <configuration>
+                <codeGenerators>
+                  <generator>
+                    <codeGeneratorClass>org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator</codeGeneratorClass>
+                    <outputBaseDir>${jmxGeneratorPath}</outputBaseDir>
+                    <additionalConfiguration>
+                      <namespaceToPackage1>urn:opendaylight:params:xml:ns:yang:controller==org.opendaylight.controller.config.yang</namespaceToPackage1>
+                    </additionalConfiguration>
+                  </generator>
+                </codeGenerators>
+                <inspectDependencies>true</inspectDependencies>
+              </configuration>
+            </execution>
+          </executions>
+        </plugin>
+        <plugin>
+          <artifactId>maven-clean-plugin</artifactId>
+          <configuration>
+            <filesets>
+              <fileset>
+                <directory>${jmxGeneratorPath}</directory>
+                <includes>
+                  <include>**</include>
+                </includes>
+              </fileset>
+            </filesets>
+          </configuration>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+  </build>
+  <profiles>
+    <profile>
+      <activation>
+        <file>
+          <exists>${config.file}</exists>
+        </file>
+      </activation>
+      <build>
+        <pluginManagement>
+          <plugins>
+            <plugin>
+              <groupId>org.codehaus.mojo</groupId>
+              <artifactId>build-helper-maven-plugin</artifactId>
+              <executions>
+                <execution>
+                  <id>attach-artifacts</id>
+                  <goals>
+                    <goal>attach-artifact</goal>
+                  </goals>
+                  <phase>package</phase>
+                  <configuration>
+                    <artifacts>
+                      <artifact>
+                        <file>${config.file}</file>
+                        <type>xml</type>
+                        <classifier>config</classifier>
+                      </artifact>
+                    </artifacts>
+                  </configuration>
+                </execution>
+              </executions>
+            </plugin>
+          </plugins>
+        </pluginManagement>
+        <plugins>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+</project>
index fc447aa7f9a10fe03877e5bb8cb2da4e099836bb..dc13989ab112da7eb83aabd7cf838d1dd25154f0 100644 (file)
@@ -43,6 +43,7 @@
     <module>config-netty-config</module>
 
     <module>config-artifacts</module>
+    <module>config-parent</module>
   </modules>
 
   <dependencies>
index 59bec915118670d92af35553e79f688936288e5c..8022e721579e0d66c8fff33d2955f6197f6607fb 100644 (file)
@@ -13,7 +13,6 @@ import akka.actor.Props;
 import akka.actor.UntypedActor;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
-import akka.japi.Creator;
 import org.opendaylight.controller.cluster.example.messages.KeyValue;
 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
 
@@ -27,14 +26,8 @@ public class ClientActor extends UntypedActor {
         this.target = target;
     }
 
-    public static Props props(final ActorRef target){
-        return Props.create(new Creator<ClientActor>(){
-            private static final long serialVersionUID = 1L;
-
-            @Override public ClientActor create() throws Exception {
-                return new ClientActor(target);
-            }
-        });
+    public static Props props(final ActorRef target) {
+        return Props.create(ClientActor.class, target);
     }
 
     @Override public void onReceive(Object message) throws Exception {
index 6c65021d86158b22483ada73704f68ba9f83d84c..684c3ac30ecb5cbcb441f90bf430e63e0b5267a7 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.controller.cluster.example;
 
 import akka.actor.ActorRef;
 import akka.actor.Props;
-import akka.japi.Creator;
 import com.google.common.base.Optional;
 import com.google.protobuf.ByteString;
 import java.io.ByteArrayInputStream;
@@ -53,13 +52,8 @@ public class ExampleActor extends RaftActor {
     }
 
     public static Props props(final String id, final Map<String, String> peerAddresses,
-        final Optional<ConfigParams> configParams){
-        return Props.create(new Creator<ExampleActor>(){
-
-            @Override public ExampleActor create() throws Exception {
-                return new ExampleActor(id, peerAddresses, configParams);
-            }
-        });
+            final Optional<ConfigParams> configParams) {
+        return Props.create(ExampleActor.class, id, peerAddresses, configParams);
     }
 
     @Override public void onReceiveCommand(Object message) throws Exception{
index c0ee09536761847706a59b5c4be9ff3e2127ab23..1676a41c56d7f9c2cfe81a7fc9e46c6828c8d721 100644 (file)
@@ -1,10 +1,8 @@
 package org.opendaylight.controller.cluster.example;
 
-import akka.actor.Actor;
 import akka.actor.ActorRef;
 import akka.actor.Cancellable;
 import akka.actor.Props;
-import akka.japi.Creator;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -46,12 +44,7 @@ public class ExampleRoleChangeListener extends AbstractUntypedActor implements A
     }
 
     public static Props getProps(final String memberName) {
-        return Props.create(new Creator<Actor>() {
-            @Override
-            public Actor create() throws Exception {
-                return new ExampleRoleChangeListener(memberName);
-            }
-        });
+        return Props.create(ExampleRoleChangeListener.class, memberName);
     }
 
     @Override
index a9aa56174d84a4309c27560dbd1be26c4603de34..d065f6d211be3ecba0bfc1364eb0693e68e42679 100644 (file)
@@ -8,11 +8,9 @@
 
 package org.opendaylight.controller.cluster.notifications;
 
-import akka.actor.Actor;
 import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.Props;
-import akka.japi.Creator;
 import akka.serialization.Serialization;
 import com.google.common.collect.Maps;
 import java.util.Map;
@@ -35,12 +33,7 @@ public class RoleChangeNotifier extends AbstractUntypedActor implements AutoClos
     }
 
     public static Props getProps(final String memberId) {
-        return Props.create(new Creator<Actor>() {
-            @Override
-            public Actor create() throws Exception {
-                return new RoleChangeNotifier(memberId);
-            }
-        });
+        return Props.create(RoleChangeNotifier.class, memberId);
     }
 
     @Override
index 60efb9d7ca1ec163b826b62efd485a7939e88068..b706d20d1adaf1ff0419f647ae17deb056f6af7e 100644 (file)
@@ -12,7 +12,6 @@ import akka.actor.ActorSystem;
 import akka.actor.DeadLetter;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
-import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
 import org.junit.After;
 import org.junit.Before;
@@ -80,13 +79,7 @@ public class MeteredBoundedMailboxTest {
         }
 
         public static Props props(final ReentrantLock lock){
-            return Props.create(new Creator<PingPongActor>(){
-                private static final long serialVersionUID = 1L;
-                @Override
-                public PingPongActor create() throws Exception {
-                    return new PingPongActor(lock);
-                }
-            });
+            return Props.create(PingPongActor.class, lock);
         }
 
         @Override
index 9a77e4d568961b72f26dec9716b29a5f5f0b9ccd..6f14af304f403e8340ea904bbbf22a0a1d40673d 100644 (file)
@@ -62,7 +62,11 @@ public class DataChangeListener extends AbstractUntypedActor {
 
         LOG.debug("Sending change notification {} to listener {}", change, listener);
 
-        this.listener.onDataChanged(change);
+        try {
+            this.listener.onDataChanged(change);
+        } catch (RuntimeException e) {
+            LOG.error( String.format( "Error notifying listener %s", this.listener ), e );
+        }
 
         // It seems the sender is never null but it doesn't hurt to check. If the caller passes in
         // a null sender (ActorRef.noSender()), akka translates that to the deadLetters actor.
index d5a12c73c57c0deb4fda3be2f11aac435a428bdc..25d47388fe49579fdb66271e4f33ca7effeef314 100644 (file)
@@ -13,6 +13,7 @@ import org.opendaylight.controller.cluster.datastore.messages.EnableNotification
 import org.opendaylight.controller.md.cluster.datastore.model.CompositeModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 public class DataChangeListenerTest extends AbstractActorTest {
 
@@ -92,4 +93,38 @@ public class DataChangeListenerTest extends AbstractActorTest {
             }
         }};
     }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Test
+    public void testDataChangedWithListenerRuntimeEx(){
+        new JavaTestKit(getSystem()) {{
+            AsyncDataChangeEvent mockChangeEvent1 = Mockito.mock(AsyncDataChangeEvent.class);
+            AsyncDataChangeEvent mockChangeEvent2 = Mockito.mock(AsyncDataChangeEvent.class);
+            AsyncDataChangeEvent mockChangeEvent3 = Mockito.mock(AsyncDataChangeEvent.class);
+
+            AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class);
+            Mockito.doThrow(new RuntimeException("mock")).when(mockListener).onDataChanged(mockChangeEvent2);
+
+            Props props = DataChangeListener.props(mockListener);
+            ActorRef subject = getSystem().actorOf(props, "testDataChangedWithListenerRuntimeEx");
+
+            // Let the DataChangeListener know that notifications should be enabled
+            subject.tell(new EnableNotification(true), getRef());
+
+            SchemaContext schemaContext = CompositeModel.createTestContext();
+
+            subject.tell(new DataChanged(schemaContext, mockChangeEvent1),getRef());
+            expectMsgClass(DataChangedReply.class);
+
+            subject.tell(new DataChanged(schemaContext, mockChangeEvent2),getRef());
+            expectMsgClass(DataChangedReply.class);
+
+            subject.tell(new DataChanged(schemaContext, mockChangeEvent3),getRef());
+            expectMsgClass(DataChangedReply.class);
+
+            Mockito.verify(mockListener).onDataChanged(mockChangeEvent1);
+            Mockito.verify(mockListener).onDataChanged(mockChangeEvent2);
+            Mockito.verify(mockListener).onDataChanged(mockChangeEvent3);
+        }};
+    }
 }
index 8b4ce31d2ea0ee7e82352e72fd5249b37e6873a6..d24ed5651aa8a5ffa0d554769fb389c5dc05ab78 100644 (file)
@@ -73,6 +73,8 @@ public class RemoteRpcProvider implements AutoCloseable, Provider, SchemaContext
                                      config.getRpcManagerName());
 
     LOG.debug("rpc manager started");
+
+    schemaService.registerSchemaContextListener(this);
   }
 
   @Override
index 2046e419d9f2602b444becf6986fe2c73bb9756e..31aac92051f1ee6948cbefcf4c68e49d926c5da0 100644 (file)
@@ -13,14 +13,17 @@ import akka.actor.Props;
 import akka.dispatch.OnComplete;
 import akka.japi.Creator;
 import akka.japi.Pair;
+
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.ListenableFuture;
+
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
 import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
 import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
 import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
+import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
 import org.opendaylight.controller.remote.rpc.utils.LatestEntryRoutingLogic;
 import org.opendaylight.controller.remote.rpc.utils.RoutingLogic;
@@ -53,7 +56,7 @@ public class RpcBroker extends AbstractUntypedActor {
     private static final Logger LOG = LoggerFactory.getLogger(RpcBroker.class);
     private final Broker.ProviderSession brokerSession;
     private final ActorRef rpcRegistry;
-    private final SchemaContext schemaContext;
+    private SchemaContext schemaContext;
     private final RemoteRpcProviderConfig config;
 
     private RpcBroker(Broker.ProviderSession brokerSession, ActorRef rpcRegistry,
@@ -75,9 +78,15 @@ public class RpcBroker extends AbstractUntypedActor {
             invokeRemoteRpc((InvokeRpc) message);
         } else if(message instanceof ExecuteRpc) {
             executeRpc((ExecuteRpc) message);
+        } else if(message instanceof UpdateSchemaContext) {
+            updateSchemaContext((UpdateSchemaContext) message);
         }
     }
 
+    private void updateSchemaContext(UpdateSchemaContext message) {
+        this.schemaContext = message.getSchemaContext();
+    }
+
     private void invokeRemoteRpc(final InvokeRpc msg) {
         if(LOG.isDebugEnabled()) {
             LOG.debug("Looking up the remote actor for rpc {}", msg.getRpc());
index a8407129999eade20e9a89f2563f255e94c45ed1..4cbce63f9aa2a9b9f44f4c1ba3a27a225d4c2d42 100644 (file)
@@ -13,8 +13,8 @@ import akka.actor.ActorRef;
 import akka.actor.OneForOneStrategy;
 import akka.actor.Props;
 import akka.actor.SupervisorStrategy;
-import akka.japi.Creator;
 import akka.japi.Function;
+import java.util.Set;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
 import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
@@ -26,8 +26,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.Duration;
 
-import java.util.Set;
-
 /**
  * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown.
  *
@@ -61,17 +59,10 @@ public class RpcManager extends AbstractUntypedActor {
   }
 
 
-  public static Props props(final SchemaContext schemaContext,
-                            final Broker.ProviderSession brokerSession,
-                            final RpcProvisionRegistry rpcProvisionRegistry) {
-    return Props.create(new Creator<RpcManager>() {
-      private static final long serialVersionUID = 1L;
-      @Override
-      public RpcManager create() throws Exception {
-        return new RpcManager(schemaContext, brokerSession, rpcProvisionRegistry);
-      }
-    });
-  }
+    public static Props props(final SchemaContext schemaContext, final Broker.ProviderSession brokerSession,
+            final RpcProvisionRegistry rpcProvisionRegistry) {
+        return Props.create(RpcManager.class, schemaContext, brokerSession, rpcProvisionRegistry);
+    }
 
   private void createRpcActors() {
     LOG.debug("Create rpc registry and broker actors");
@@ -123,6 +114,7 @@ public class RpcManager extends AbstractUntypedActor {
 
   private void updateSchemaContext(UpdateSchemaContext message) {
     this.schemaContext = message.getSchemaContext();
+    rpcBroker.tell(message, ActorRef.noSender());
   }
 
   @Override
index fe8c463d2e17fae2d1377aafa0be90dda23beff2..52b1106c873872609e4300abe52d558ee89a6011 100644 (file)
@@ -10,23 +10,22 @@ package org.opendaylight.controller.remote.rpc.registry;
 import akka.actor.ActorRef;
 import akka.japi.Option;
 import akka.japi.Pair;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Copier;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
-
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Copier;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
 
 public class RoutingTable implements Copier<RoutingTable>, Serializable {
     private static final long serialVersionUID = 1L;
 
-    private Map<RpcRouter.RouteIdentifier<?, ?, ?>, Long> table = new HashMap<>();
+    private final Map<RpcRouter.RouteIdentifier<?, ?, ?>, Long> table = new HashMap<>();
     private ActorRef router;
 
     @Override
     public RoutingTable copy() {
         RoutingTable copy = new RoutingTable();
-        copy.setTable(new HashMap<>(table));
+        copy.table.putAll(table);
         copy.setRouter(this.getRouter());
 
         return copy;
@@ -35,10 +34,11 @@ public class RoutingTable implements Copier<RoutingTable>, Serializable {
     public Option<Pair<ActorRef, Long>> getRouterFor(RpcRouter.RouteIdentifier<?, ?, ?> routeId){
         Long updatedTime = table.get(routeId);
 
-        if (updatedTime == null || router == null)
+        if (updatedTime == null || router == null) {
             return Option.none();
-        else
+        } else {
             return Option.option(new Pair<>(router, updatedTime));
+        }
     }
 
     public void addRoute(RpcRouter.RouteIdentifier<?,?,?> routeId){
@@ -49,23 +49,16 @@ public class RoutingTable implements Copier<RoutingTable>, Serializable {
         table.remove(routeId);
     }
 
-    public Boolean contains(RpcRouter.RouteIdentifier<?, ?, ?> routeId){
+    public boolean contains(RpcRouter.RouteIdentifier<?, ?, ?> routeId){
         return table.containsKey(routeId);
     }
 
-    public Boolean isEmpty(){
+    public boolean isEmpty(){
         return table.isEmpty();
     }
-    ///
-    /// Getter, Setters
-    ///
-    //TODO: Remove public
-    public Map<RpcRouter.RouteIdentifier<?, ?, ?>, Long> getTable() {
-        return table;
-    }
 
-    void setTable(Map<RpcRouter.RouteIdentifier<?, ?, ?>, Long> table) {
-        this.table = table;
+    public int size() {
+        return table.size();
     }
 
     public ActorRef getRouter() {
index 095d70926b90d3838a777cc14a6976b31ccd9c97..845c1c819a70ca6bac0fb1a717b31f7861b6a6b6 100644 (file)
@@ -8,36 +8,21 @@
 package org.opendaylight.controller.remote.rpc.registry;
 
 import akka.actor.ActorRef;
-import akka.actor.Address;
-import akka.actor.Props;
-import akka.dispatch.Mapper;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import akka.japi.Option;
 import akka.japi.Pair;
-import akka.pattern.Patterns;
 import com.google.common.base.Preconditions;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import java.util.ArrayList;
+import java.util.List;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import scala.concurrent.Future;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
+import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
 
 /**
  * Registry to look up cluster nodes that have registered for a given rpc.
@@ -45,51 +30,29 @@ import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.Bu
  * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
  * cluster wide information.
  */
-public class RpcRegistry extends AbstractUntypedActorWithMetering {
+public class RpcRegistry extends BucketStore<RoutingTable> {
 
     final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
 
-    /**
-     * Store to keep the registry. Bucket store sync's it across nodes in the cluster
-     */
-    private ActorRef bucketStore;
-
-    /**
-     * Rpc broker that would use the registry to route requests.
-     */
-    private ActorRef localRouter;
-
-    private RemoteRpcProviderConfig config;
-
     public RpcRegistry() {
-        bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store");
-        this.config = new RemoteRpcProviderConfig(getContext().system().settings().config());
-        log.info("Bucket store path = {}", bucketStore.path().toString());
+        getLocalBucket().setData(new RoutingTable());
     }
 
-    public RpcRegistry(ActorRef bucketStore) {
-        this.bucketStore = bucketStore;
-    }
-
-
     @Override
     protected void handleReceive(Object message) throws Exception {
         //TODO: if sender is remote, reject message
 
-        if (message instanceof SetLocalRouter)
+        if (message instanceof SetLocalRouter) {
             receiveSetLocalRouter((SetLocalRouter) message);
-
-        if (message instanceof AddOrUpdateRoutes)
+        } else if (message instanceof AddOrUpdateRoutes) {
             receiveAddRoutes((AddOrUpdateRoutes) message);
-
-        else if (message instanceof RemoveRoutes)
+        } else if (message instanceof RemoveRoutes) {
             receiveRemoveRoutes((RemoveRoutes) message);
-
-        else if (message instanceof Messages.FindRouters)
+        } else if (message instanceof Messages.FindRouters) {
             receiveGetRouter((FindRouters) message);
-
-        else
-            unhandled(message);
+        } else {
+            super.handleReceive(message);
+        }
     }
 
     /**
@@ -98,7 +61,7 @@ public class RpcRegistry extends AbstractUntypedActorWithMetering {
      * @param message contains {@link akka.actor.ActorRef} for rpc broker
      */
     private void receiveSetLocalRouter(SetLocalRouter message) {
-        localRouter = message.getRouter();
+        getLocalBucket().getData().setRouter(message.getRouter());
     }
 
     /**
@@ -106,10 +69,14 @@ public class RpcRegistry extends AbstractUntypedActorWithMetering {
      */
     private void receiveAddRoutes(AddOrUpdateRoutes msg) {
 
-        Preconditions.checkState(localRouter != null, "Router must be set first");
+        log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
+
+        RoutingTable table = getLocalBucket().getData().copy();
+        for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
+            table.addRoute(routeId);
+        }
 
-        Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration());
-        futureReply.map(getMapperToAddRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
+        updateLocalBucket(table);
     }
 
     /**
@@ -117,9 +84,12 @@ public class RpcRegistry extends AbstractUntypedActorWithMetering {
      */
     private void receiveRemoveRoutes(RemoveRoutes msg) {
 
-        Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration());
-        futureReply.map(getMapperToRemoveRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
+        RoutingTable table = getLocalBucket().getData().copy();
+        for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
+            table.removeRoute(routeId);
+        }
 
+        updateLocalBucket(table);
     }
 
     /**
@@ -128,168 +98,28 @@ public class RpcRegistry extends AbstractUntypedActorWithMetering {
      * @param msg
      */
     private void receiveGetRouter(FindRouters msg) {
-        final ActorRef sender = getSender();
-
-        Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), config.getAskDuration());
-        futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher());
-    }
-
-    /**
-     * Helper to create empty reply when no routers are found
-     *
-     * @return
-     */
-    private Messages.FindRoutersReply createEmptyReply() {
-        List<Pair<ActorRef, Long>> routerWithUpdateTime = Collections.emptyList();
-        return new Messages.FindRoutersReply(routerWithUpdateTime);
-    }
-
-    /**
-     * Helper to create a reply when routers are found for the given rpc
-     *
-     * @param buckets
-     * @param routeId
-     * @return
-     */
-    private Messages.FindRoutersReply createReplyWithRouters(
-            Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
-
         List<Pair<ActorRef, Long>> routers = new ArrayList<>();
-        Option<Pair<ActorRef, Long>> routerWithUpdateTime = null;
-
-        for (Bucket bucket : buckets.values()) {
-
-            RoutingTable table = (RoutingTable) bucket.getData();
-            if (table == null)
-                continue;
 
-            routerWithUpdateTime = table.getRouterFor(routeId);
-            if (routerWithUpdateTime.isEmpty())
-                continue;
+        RouteIdentifier<?, ?, ?> routeId = msg.getRouteIdentifier();
+        findRoutes(getLocalBucket().getData(), routeId, routers);
 
-            routers.add(routerWithUpdateTime.get());
+        for(Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
+            findRoutes(bucket.getData(), routeId, routers);
         }
 
-        return new Messages.FindRoutersReply(routers);
-    }
-
-
-    ///
-    ///private factories to create Mapper
-    ///
-
-    /**
-     * Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found
-     *
-     * @param routeId the rpc
-     * @param sender  client who asked to find the routers.
-     * @return
-     */
-    private Mapper<Object, Void> getMapperToGetRouter(
-            final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
-        return new Mapper<Object, Void>() {
-            @Override
-            public Void apply(Object replyMessage) {
-
-                if (replyMessage instanceof GetAllBucketsReply) {
-
-                    GetAllBucketsReply reply = (GetAllBucketsReply) replyMessage;
-                    Map<Address, Bucket> buckets = reply.getBuckets();
-
-                    if (buckets == null || buckets.isEmpty()) {
-                        sender.tell(createEmptyReply(), getSelf());
-                        return null;
-                    }
-
-                    sender.tell(createReplyWithRouters(buckets, routeId), getSelf());
-                }
-                return null;
-            }
-        };
-    }
-
-    /**
-     * Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently,
-     * it updates the local bucket in bucket store.
-     *
-     * @param routeIds rpc to remote
-     * @return
-     */
-    private Mapper<Object, Void> getMapperToRemoveRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
-        return new Mapper<Object, Void>() {
-            @Override
-            public Void apply(Object replyMessage) {
-                if (replyMessage instanceof GetLocalBucketReply) {
-
-                    GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
-                    Bucket<RoutingTable> bucket = reply.getBucket();
-
-                    if (bucket == null) {
-                        log.debug("Local bucket is null");
-                        return null;
-                    }
-
-                    RoutingTable table = bucket.getData();
-                    if (table == null)
-                        table = new RoutingTable();
-
-                    table.setRouter(localRouter);
-
-                    if (!table.isEmpty()) {
-                        for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
-                            table.removeRoute(routeId);
-                        }
-                    }
-                    bucket.setData(table);
-
-                    UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
-                    bucketStore.tell(updateBucketMessage, getSelf());
-                }
-                return null;
-            }
-        };
+        getSender().tell(new Messages.FindRoutersReply(routers), getSelf());
     }
 
-    /**
-     * Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently,
-     * it updates the local bucket in bucket store.
-     *
-     * @param routeIds rpc to add
-     * @return
-     */
-    private Mapper<Object, Void> getMapperToAddRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
-
-        return new Mapper<Object, Void>() {
-            @Override
-            public Void apply(Object replyMessage) {
-                if (replyMessage instanceof GetLocalBucketReply) {
-
-                    GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
-                    Bucket<RoutingTable> bucket = reply.getBucket();
-
-                    if (bucket == null) {
-                        log.debug("Local bucket is null");
-                        return null;
-                    }
-
-                    RoutingTable table = bucket.getData();
-                    if (table == null)
-                        table = new RoutingTable();
-
-                    table.setRouter(localRouter);
-                    for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
-                        table.addRoute(routeId);
-                    }
-
-                    bucket.setData(table);
-
-                    UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
-                    bucketStore.tell(updateBucketMessage, getSelf());
-                }
+    private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier<?, ?, ?> routeId,
+            List<Pair<ActorRef, Long>> routers) {
+        if (table == null) {
+            return;
+        }
 
-                return null;
-            }
-        };
+        Option<Pair<ActorRef, Long>> routerWithUpdateTime = table.getRouterFor(routeId);
+        if(!routerWithUpdateTime.isEmpty()) {
+            routers.add(routerWithUpdateTime.get());
+        }
     }
 
     /**
index f5dfbc565013b27228af8b31c64a7e8f38446e54..c40fc9349ec059a174e69820fd03639a5bcf45dc 100644 (file)
@@ -11,5 +11,4 @@ package org.opendaylight.controller.remote.rpc.registry.gossip;
 public interface Bucket<T extends Copier<T>> {
     public Long getVersion();
     public T getData();
-    public void setData(T data);
 }
index 01c77f1f085823928c91dc1feb451ad6a2158c0d..b81175e9a253176870ddf92901d8d2debaa4d698 100644 (file)
@@ -16,6 +16,23 @@ public class BucketImpl<T extends Copier<T>> implements Bucket<T>, Serializable
 
     private T data;
 
+    public BucketImpl() {
+    }
+
+    public BucketImpl(T data) {
+        this.data = data;
+    }
+
+    public BucketImpl(Bucket<T> other) {
+        this.version = other.getVersion();
+        this.data = other.getData();
+    }
+
+    public void setData(T data) {
+        this.data = data;
+        this.version = System.currentTimeMillis()+1;
+    }
+
     @Override
     public Long getVersion() {
         return version;
@@ -23,15 +40,7 @@ public class BucketImpl<T extends Copier<T>> implements Bucket<T>, Serializable
 
     @Override
     public T getData() {
-        if (this.data == null)
-            return null;
-
-        return data.copy();
-    }
-
-    public void setData(T data){
-        this.version = System.currentTimeMillis()+1;
-        this.data = data;
+        return data;
     }
 
     @Override
index 6ffe147e71384b66ece31c4e701294c8630456c8..934609b7cfcfeb8ea14a8c1ff803c53ba938b1aa 100644 (file)
@@ -15,11 +15,10 @@ import akka.actor.Props;
 import akka.cluster.ClusterActorRefProvider;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
+import com.google.common.annotations.VisibleForTesting;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
@@ -28,9 +27,6 @@ import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketSto
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
 import org.opendaylight.controller.utils.ConditionalProbe;
 
@@ -43,24 +39,26 @@ import org.opendaylight.controller.utils.ConditionalProbe;
  * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
  *
  */
-public class BucketStore extends AbstractUntypedActorWithMetering {
+public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMetering {
+
+    private static final Long NO_VERSION = -1L;
 
     final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
 
     /**
      * Bucket owned by the node
      */
-    private BucketImpl localBucket = new BucketImpl();
+    private final BucketImpl<T> localBucket = new BucketImpl<>();
 
     /**
      * Buckets ownded by other known nodes in the cluster
      */
-    private ConcurrentMap<Address, Bucket> remoteBuckets = new ConcurrentHashMap<>();
+    private final Map<Address, Bucket<T>> remoteBuckets = new HashMap<>();
 
     /**
      * Bucket version for every known node in the cluster including this node
      */
-    private ConcurrentMap<Address, Long> versions = new ConcurrentHashMap<>();
+    private final Map<Address, Long> versions = new HashMap<>();
 
     /**
      * Cluster address for this node
@@ -85,7 +83,6 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
         }
     }
 
-
     @Override
     protected void handleReceive(Object message) throws Exception {
         if (probe != null) {
@@ -98,20 +95,14 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
             probe = (ConditionalProbe) message;
             // Send back any message to tell the caller we got the probe.
             getSender().tell("Got it", getSelf());
-        } else if (message instanceof UpdateBucket) {
-            receiveUpdateBucket(((UpdateBucket) message).getBucket());
         } else if (message instanceof GetAllBuckets) {
-            receiveGetAllBucket();
-        } else if (message instanceof GetLocalBucket) {
-            receiveGetLocalBucket();
+            receiveGetAllBuckets();
         } else if (message instanceof GetBucketsByMembers) {
-            receiveGetBucketsByMembers(
-                    ((GetBucketsByMembers) message).getMembers());
+            receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
         } else if (message instanceof GetBucketVersions) {
             receiveGetBucketVersions();
         } else if (message instanceof UpdateRemoteBuckets) {
-            receiveUpdateRemoteBuckets(
-                    ((UpdateRemoteBuckets) message).getBuckets());
+            receiveUpdateRemoteBuckets(((UpdateRemoteBuckets) message).getBuckets());
         } else {
             if(log.isDebugEnabled()) {
                 log.debug("Unhandled message [{}]", message);
@@ -120,30 +111,10 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
         }
     }
 
-    /**
-     * Returns a copy of bucket owned by this node
-     */
-    private void receiveGetLocalBucket() {
-        final ActorRef sender = getSender();
-        GetLocalBucketReply reply = new GetLocalBucketReply(localBucket);
-        sender.tell(reply, getSelf());
-    }
-
-    /**
-     * Updates the bucket owned by this node
-     *
-     * @param updatedBucket
-     */
-    void receiveUpdateBucket(Bucket updatedBucket){
-
-        localBucket = (BucketImpl) updatedBucket;
-        versions.put(selfAddress, localBucket.getVersion());
-    }
-
     /**
      * Returns all the buckets the this node knows about, self owned + remote
      */
-    void receiveGetAllBucket(){
+    void receiveGetAllBuckets(){
         final ActorRef sender = getSender();
         sender.tell(new GetAllBucketsReply(getAllBuckets()), getSelf());
     }
@@ -153,11 +124,12 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
      *
      * @return self owned + remote buckets
      */
+    @SuppressWarnings("rawtypes")
     Map<Address, Bucket> getAllBuckets(){
         Map<Address, Bucket> all = new HashMap<>(remoteBuckets.size() + 1);
 
         //first add the local bucket
-        all.put(selfAddress, localBucket);
+        all.put(selfAddress, new BucketImpl<>(localBucket));
 
         //then get all remote buckets
         all.putAll(remoteBuckets);
@@ -170,6 +142,7 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
      *
      * @param members requested members
      */
+    @SuppressWarnings("rawtypes")
     void receiveGetBucketsByMembers(Set<Address> members){
         final ActorRef sender = getSender();
         Map<Address, Bucket> buckets = getBucketsByMembers(members);
@@ -182,12 +155,13 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
      * @param members requested members
      * @return buckets for requested memebers
      */
+    @SuppressWarnings("rawtypes")
     Map<Address, Bucket> getBucketsByMembers(Set<Address> members) {
         Map<Address, Bucket> buckets = new HashMap<>();
 
         //first add the local bucket if asked
         if (members.contains(selfAddress)) {
-            buckets.put(selfAddress, localBucket);
+            buckets.put(selfAddress, new BucketImpl<>(localBucket));
         }
 
         //then get buckets for requested remote nodes
@@ -215,8 +189,9 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
      * @param receivedBuckets buckets sent by remote
      *                        {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
      */
+    @SuppressWarnings({ "rawtypes", "unchecked" })
     void receiveUpdateRemoteBuckets(Map<Address, Bucket> receivedBuckets){
-
+        log.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
         if (receivedBuckets == null || receivedBuckets.isEmpty())
          {
             return; //nothing to do
@@ -229,10 +204,10 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
 
             Long localVersion = versions.get(entry.getKey());
             if (localVersion == null) {
-                localVersion = -1L;
+                localVersion = NO_VERSION;
             }
 
-            Bucket receivedBucket = entry.getValue();
+            Bucket<T> receivedBucket = entry.getValue();
 
             if (receivedBucket == null) {
                 continue;
@@ -240,7 +215,7 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
 
             Long remoteVersion = receivedBucket.getVersion();
             if (remoteVersion == null) {
-                remoteVersion = -1L;
+                remoteVersion = NO_VERSION;
             }
 
             //update only if remote version is newer
@@ -249,40 +224,27 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
                 versions.put(entry.getKey(), remoteVersion);
             }
         }
+
         if(log.isDebugEnabled()) {
             log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
         }
     }
 
-    ///
-    ///Getter Setters
-    ///
-
-    BucketImpl getLocalBucket() {
+    protected BucketImpl<T> getLocalBucket() {
         return localBucket;
     }
 
-    void setLocalBucket(BucketImpl localBucket) {
-        this.localBucket = localBucket;
+    protected void updateLocalBucket(T data) {
+        localBucket.setData(data);
+        versions.put(selfAddress, localBucket.getVersion());
     }
 
-    ConcurrentMap<Address, Bucket> getRemoteBuckets() {
+    protected Map<Address, Bucket<T>> getRemoteBuckets() {
         return remoteBuckets;
     }
 
-    void setRemoteBuckets(ConcurrentMap<Address, Bucket> remoteBuckets) {
-        this.remoteBuckets = remoteBuckets;
-    }
-
-    ConcurrentMap<Address, Long> getVersions() {
+    @VisibleForTesting
+    Map<Address, Long> getVersions() {
         return versions;
     }
-
-    void setVersions(ConcurrentMap<Address, Long> versions) {
-        this.versions = versions;
-    }
-
-    Address getSelfAddress() {
-        return selfAddress;
-    }
 }
index 4e8f2c61c9af22869f785b2632a3df8695b2c48a..b05bd7d0f6f314c990e4a1d7909115b6c1ad1348 100644 (file)
@@ -9,16 +9,14 @@ package org.opendaylight.controller.remote.rpc.registry.gossip;
 
 import akka.actor.Address;
 import com.google.common.base.Preconditions;
-
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBucketVersions;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBuckets;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBucketVersions;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBuckets;
 
 
 /**
@@ -29,46 +27,13 @@ public class Messages {
 
     public static class BucketStoreMessages{
 
-        public static class GetLocalBucket implements Serializable {
-            private static final long serialVersionUID = 1L;
-        }
-
-        public static class ContainsBucket implements Serializable {
-            private static final long serialVersionUID = 1L;
-            final private Bucket bucket;
-
-            public ContainsBucket(Bucket bucket){
-                Preconditions.checkArgument(bucket != null, "bucket can not be null");
-                this.bucket = bucket;
-            }
-
-            public Bucket getBucket(){
-                return bucket;
-            }
-
-        }
-
-        public static class UpdateBucket extends ContainsBucket implements Serializable {
-            private static final long serialVersionUID = 1L;
-            public UpdateBucket(Bucket bucket){
-                super(bucket);
-            }
-        }
-
-        public static class GetLocalBucketReply extends ContainsBucket implements Serializable {
-            private static final long serialVersionUID = 1L;
-            public GetLocalBucketReply(Bucket bucket){
-                super(bucket);
-            }
-        }
-
         public static class GetAllBuckets implements Serializable {
             private static final long serialVersionUID = 1L;
         }
 
         public static class GetBucketsByMembers implements Serializable{
             private static final long serialVersionUID = 1L;
-            private Set<Address> members;
+            private final Set<Address> members;
 
             public GetBucketsByMembers(Set<Address> members){
                 Preconditions.checkArgument(members != null, "members can not be null");
@@ -82,7 +47,7 @@ public class Messages {
 
         public static class ContainsBuckets implements Serializable{
             private static final long serialVersionUID = 1L;
-            private Map<Address, Bucket> buckets;
+            private final Map<Address, Bucket> buckets;
 
             public ContainsBuckets(Map<Address, Bucket> buckets){
                 Preconditions.checkArgument(buckets != null, "buckets can not be null");
@@ -94,11 +59,12 @@ public class Messages {
 
                 for (Map.Entry<Address, Bucket> entry : buckets.entrySet()){
                     //ignore null entries
-                    if ( (entry.getKey() == null) || (entry.getValue() == null) )
+                    if ( (entry.getKey() == null) || (entry.getValue() == null) ) {
                         continue;
+                    }
                     copy.put(entry.getKey(), entry.getValue());
                 }
-                return new HashMap<>(copy);
+                return copy;
             }
         }
 
@@ -162,7 +128,7 @@ public class Messages {
 
         public static final class GossipStatus extends ContainsBucketVersions implements Serializable{
             private static final long serialVersionUID = 1L;
-            private Address from;
+            private final Address from;
 
             public GossipStatus(Address from, Map<Address, Long> versions) {
                 super(versions);
index e0d145dbe179b4af06109f1df28151b066b50cd8..5b7b7e4fdc61dfaaa2377078853b691b48b07394 100644 (file)
@@ -1,38 +1,41 @@
 package org.opendaylight.controller.remote.rpc.registry;
 
-import akka.actor.ActorPath;
 import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
-import akka.actor.ChildActorPath;
+import akka.actor.Address;
 import akka.actor.Props;
-import akka.pattern.Patterns;
+import akka.japi.Pair;
 import akka.testkit.JavaTestKit;
-import akka.util.Timeout;
-import com.google.common.base.Predicate;
+import com.google.common.util.concurrent.Uninterruptibles;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import javax.annotation.Nullable;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
 import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRoutersReply;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.opendaylight.controller.utils.ConditionalProbe;
+import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
 import org.opendaylight.yangtools.yang.common.QName;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -46,6 +49,8 @@ public class RpcRegistryTest {
     private ActorRef registry2;
     private ActorRef registry3;
 
+    private int routeIdCounter = 1;
+
     @BeforeClass
     public static void staticSetup() throws InterruptedException {
       RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build();
@@ -97,27 +102,30 @@ public class RpcRegistryTest {
 
         final JavaTestKit mockBroker = new JavaTestKit(node1);
 
-        final ActorPath bucketStorePath = new ChildActorPath(registry1.path(), "store");
+        Address nodeAddress = node1.provider().getDefaultAddress();
 
         // Add rpc on node 1
         registry1.tell(new SetLocalRouter(mockBroker.getRef()), mockBroker.getRef());
 
-        // install probe
-        final JavaTestKit probe1 = createProbeForMessage(node1, bucketStorePath,
-                Messages.BucketStoreMessages.UpdateBucket.class);
+        List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds = createRouteIds();
 
-        registry1.tell(getAddRouteMessage(), mockBroker.getRef());
+        registry1.tell(new AddOrUpdateRoutes(addedRouteIds), mockBroker.getRef());
 
         // Bucket store should get an update bucket message. Updated bucket contains added rpc.
-        probe1.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
-                Messages.BucketStoreMessages.UpdateBucket.class);
+
+        Map<Address, Bucket> buckets = retrieveBuckets(registry1, mockBroker, nodeAddress);
+        verifyBucket(buckets.get(nodeAddress), addedRouteIds);
+
+        Map<Address, Long> versions = retrieveVersions(registry1, mockBroker);
+        Assert.assertEquals("Version for bucket " + nodeAddress, buckets.get(nodeAddress).getVersion(),
+                versions.get(nodeAddress));
 
         // Now remove rpc
-        registry1.tell(getRemoveRouteMessage(), mockBroker.getRef());
+        registry1.tell(new RemoveRoutes(addedRouteIds), mockBroker.getRef());
 
         // Bucket store should get an update bucket message. Rpc is removed in the updated bucket
-        probe1.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
-                Messages.BucketStoreMessages.UpdateBucket.class);
+
+        verifyEmptyBucket(mockBroker, registry1, nodeAddress);
 
         System.out.println("testAddRemoveRpcOnSameNode ending");
 
@@ -136,30 +144,52 @@ public class RpcRegistryTest {
         System.out.println("testRpcAddRemoveInCluster starting");
 
         final JavaTestKit mockBroker1 = new JavaTestKit(node1);
+        final JavaTestKit mockBroker2 = new JavaTestKit(node2);
+
+        List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds = createRouteIds();
 
-        // install probe on node2's bucket store
-        final ActorPath bucketStorePath = new ChildActorPath(registry2.path(), "store");
-        final JavaTestKit probe2 = createProbeForMessage(node2, bucketStorePath,
-                Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+        Address node1Address = node1.provider().getDefaultAddress();
 
         // Add rpc on node 1
         registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
-        registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
+        registry1.tell(new AddOrUpdateRoutes(addedRouteIds), mockBroker1.getRef());
 
         // Bucket store on node2 should get a message to update its local copy of remote buckets
-        probe2.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
-                Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+
+        Map<Address, Bucket> buckets = retrieveBuckets(registry2, mockBroker2, node1Address);
+        verifyBucket(buckets.get(node1Address), addedRouteIds);
 
         // Now remove
-        registry1.tell(getRemoveRouteMessage(), mockBroker1.getRef());
+        registry1.tell(new RemoveRoutes(addedRouteIds), mockBroker1.getRef());
 
-        // Bucket store on node2 should get a message to update its local copy of remote buckets
-        probe2.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
-                Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+        // Bucket store on node2 should get a message to update its local copy of remote buckets.
+        // Wait for the bucket for node1 to be empty.
+
+        verifyEmptyBucket(mockBroker2, registry2, node1Address);
 
         System.out.println("testRpcAddRemoveInCluster ending");
     }
 
+    private void verifyEmptyBucket(JavaTestKit testKit, ActorRef registry, Address address)
+            throws AssertionError {
+        Map<Address, Bucket> buckets;
+        int nTries = 0;
+        while(true) {
+            buckets = retrieveBuckets(registry1, testKit, address);
+
+            try {
+                verifyBucket(buckets.get(address), Collections.<RouteIdentifier<?, ?, ?>>emptyList());
+                break;
+            } catch (AssertionError e) {
+                if(++nTries >= 50) {
+                    throw e;
+                }
+            }
+
+            Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+        }
+    }
+
     /**
      * Three node cluster. Register rpc on 2 nodes. Ensure 3rd gets updated.
      *
@@ -174,76 +204,142 @@ public class RpcRegistryTest {
 
         registry3.tell(new SetLocalRouter(mockBroker3.getRef()), mockBroker3.getRef());
 
-        // install probe on node 3
-        final ActorPath bucketStorePath = new ChildActorPath(registry3.path(), "store");
-        final JavaTestKit probe3 = createProbeForMessage(node3, bucketStorePath,
-                Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
-
         // Add rpc on node 1
+        List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds1 = createRouteIds();
         registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
-        registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
-
-        probe3.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
-                Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+        registry1.tell(new AddOrUpdateRoutes(addedRouteIds1), mockBroker1.getRef());
 
-        // Add same rpc on node 2
+        // Add rpc on node 2
+        List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds2 = createRouteIds();
         registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef());
-        registry2.tell(getAddRouteMessage(), mockBroker2.getRef());
+        registry2.tell(new AddOrUpdateRoutes(addedRouteIds2), mockBroker2.getRef());
+
+        Address node1Address = node1.provider().getDefaultAddress();
+        Address node2Address = node2.provider().getDefaultAddress();
+
+        Map<Address, Bucket> buckets = retrieveBuckets(registry3, mockBroker3, node1Address,
+                node2Address);
 
-        probe3.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
-                Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+        verifyBucket(buckets.get(node1Address), addedRouteIds1);
+        verifyBucket(buckets.get(node2Address), addedRouteIds2);
+
+        Map<Address, Long> versions = retrieveVersions(registry3, mockBroker3);
+        Assert.assertEquals("Version for bucket " + node1Address, buckets.get(node1Address).getVersion(),
+                versions.get(node1Address));
+        Assert.assertEquals("Version for bucket " + node2Address, buckets.get(node2Address).getVersion(),
+                versions.get(node2Address));
+
+        RouteIdentifier<?, ?, ?> routeID = addedRouteIds1.get(0);
+        registry3.tell(new FindRouters(routeID), mockBroker3.getRef());
+
+        FindRoutersReply reply = mockBroker3.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
+                FindRoutersReply.class);
+
+        List<Pair<ActorRef, Long>> respList = reply.getRouterWithUpdateTime();
+        Assert.assertEquals("getRouterWithUpdateTime size", 1, respList.size());
+
+        respList.get(0).first().tell("hello", ActorRef.noSender());
+        mockBroker1.expectMsgEquals(Duration.create(3, TimeUnit.SECONDS), "hello");
     }
 
-    private JavaTestKit createProbeForMessage(ActorSystem node, ActorPath subjectPath, final Class<?> clazz)
-            throws Exception {
-        final JavaTestKit probe = new JavaTestKit(node);
-
-        ConditionalProbe conditionalProbe = new ConditionalProbe(probe.getRef(), new Predicate<Object>() {
-            @Override
-            public boolean apply(@Nullable Object input) {
-                if (input != null) {
-                    return clazz.equals(input.getClass());
-                } else {
-                    return false;
-                }
+    private Map<Address, Long> retrieveVersions(ActorRef bucketStore, JavaTestKit testKit) {
+        bucketStore.tell(new GetBucketVersions(), testKit.getRef());
+        GetBucketVersionsReply reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
+                GetBucketVersionsReply.class);
+        return reply.getVersions();
+    }
+
+    private void verifyBucket(Bucket<RoutingTable> bucket, List<RouteIdentifier<?, ?, ?>> expRouteIds) {
+        RoutingTable table = bucket.getData();
+        Assert.assertNotNull("Bucket RoutingTable is null", table);
+        for(RouteIdentifier<?, ?, ?> r: expRouteIds) {
+            if(!table.contains(r)) {
+                Assert.fail("RoutingTable does not contain " + r + ". Actual: " + table);
             }
-        });
+        }
 
-        FiniteDuration duration = Duration.create(3, TimeUnit.SECONDS);
-        Timeout timeout = new Timeout(duration);
-        int maxTries = 30;
-        int i = 0;
-        while(true) {
-            ActorSelection subject = node.actorSelection(subjectPath);
-            Future<Object> future = Patterns.ask(subject, conditionalProbe, timeout);
+        Assert.assertEquals("RoutingTable size", expRouteIds.size(), table.size());
+    }
 
-            try {
-                Await.ready(future, duration);
-                break;
-            } catch (TimeoutException | InterruptedException e) {
-                if(++i > maxTries) {
-                    throw e;
+    private Map<Address, Bucket> retrieveBuckets(ActorRef bucketStore, JavaTestKit testKit,
+            Address... addresses) {
+        int nTries = 0;
+        while(true) {
+            bucketStore.tell(new GetAllBuckets(), testKit.getRef());
+            GetAllBucketsReply reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
+                    GetAllBucketsReply.class);
+
+            Map<Address, Bucket> buckets = reply.getBuckets();
+            boolean foundAll = true;
+            for(Address addr: addresses) {
+                Bucket bucket = buckets.get(addr);
+                if(bucket  == null) {
+                    foundAll = false;
+                    break;
                 }
             }
-        }
 
-        return probe;
+            if(foundAll) {
+                return buckets;
+            }
 
-    }
+            if(++nTries >= 50) {
+                Assert.fail("Missing expected buckets for addresses: " + Arrays.toString(addresses)
+                        + ", Actual: " + buckets);
+            }
 
-    private AddOrUpdateRoutes getAddRouteMessage() throws URISyntaxException {
-        return new AddOrUpdateRoutes(createRouteIds());
+            Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+        }
     }
 
-    private RemoveRoutes getRemoveRouteMessage() throws URISyntaxException {
-        return new RemoveRoutes(createRouteIds());
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testAddRoutesConcurrency() throws Exception {
+        final JavaTestKit testKit = new JavaTestKit(node1);
+
+        registry1.tell(new SetLocalRouter(testKit.getRef()), ActorRef.noSender());
+
+        final int nRoutes = 500;
+        final RouteIdentifier<?, ?, ?>[] added = new RouteIdentifier<?, ?, ?>[nRoutes];
+        for(int i = 0; i < nRoutes; i++) {
+            final RouteIdentifierImpl routeId = new RouteIdentifierImpl(null,
+                    new QName(new URI("/mockrpc"), "type" + i), null);
+            added[i] = routeId;
+
+            //Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+            registry1.tell(new AddOrUpdateRoutes(Arrays.<RouteIdentifier<?, ?, ?>>asList(routeId)),
+                    ActorRef.noSender());
+        }
+
+        GetAllBuckets getAllBuckets = new GetAllBuckets();
+        FiniteDuration duration = Duration.create(3, TimeUnit.SECONDS);
+        int nTries = 0;
+        while(true) {
+            registry1.tell(getAllBuckets, testKit.getRef());
+            GetAllBucketsReply reply = testKit.expectMsgClass(duration, GetAllBucketsReply.class);
+
+            Bucket<RoutingTable> localBucket = reply.getBuckets().values().iterator().next();
+            RoutingTable table = localBucket.getData();
+            if(table != null && table.size() == nRoutes) {
+                for(RouteIdentifier<?, ?, ?> r: added) {
+                    Assert.assertEquals("RoutingTable contains " + r, true, table.contains(r));
+                }
+
+                break;
+            }
+
+            if(++nTries >= 50) {
+                Assert.fail("Expected # routes: " + nRoutes + ", Actual: " + table.size());
+            }
+
+            Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+        }
     }
 
     private List<RpcRouter.RouteIdentifier<?, ?, ?>> createRouteIds() throws URISyntaxException {
-        QName type = new QName(new URI("/mockrpc"), "mockrpc");
+        QName type = new QName(new URI("/mockrpc"), "mockrpc" + routeIdCounter++);
         List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new ArrayList<>();
         routeIds.add(new RouteIdentifierImpl(null, type, null));
         return routeIds;
     }
-
 }
index 78fcbd3a1478c9cabe6201893597bf5f1a45008b..ddd08a5f4723f4c6669082c2fe4fcef3c6ff94af 100644 (file)
@@ -12,27 +12,23 @@ import akka.actor.Address;
 import akka.actor.Props;
 import akka.testkit.TestActorRef;
 import com.typesafe.config.ConfigFactory;
+import java.util.HashMap;
+import java.util.Map;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.remote.rpc.TerminationMonitor;
 
-import java.util.HashMap;
-import java.util.Map;
-
 public class BucketStoreTest {
 
     private static ActorSystem system;
-    private static BucketStore store;
 
     @BeforeClass
     public static void setup() {
 
         system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("unit-test"));
         system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
-
-        store = createStore();
     }
 
     @AfterClass
@@ -40,21 +36,6 @@ public class BucketStoreTest {
         system.shutdown();
     }
 
-    /**
-     * Given a new local bucket
-     * Should replace
-     */
-    @Test
-    public void testReceiveUpdateBucket(){
-        Bucket bucket = new BucketImpl();
-        Long expectedVersion = bucket.getVersion();
-
-        store.receiveUpdateBucket(bucket);
-
-        Assert.assertEquals(bucket, store.getLocalBucket());
-        Assert.assertEquals(expectedVersion, store.getLocalBucket().getVersion());
-    }
-
     /**
      * Given remote buckets
      * Should merge with local copy of remote buckets
@@ -62,6 +43,8 @@ public class BucketStoreTest {
     @Test
     public void testReceiveUpdateRemoteBuckets(){
 
+        BucketStore store = createStore();
+
         Address localAddress = system.provider().getDefaultAddress();
         Bucket localBucket = new BucketImpl();
 
@@ -84,7 +67,7 @@ public class BucketStoreTest {
 
         //Should NOT contain local bucket
         //Should contain ONLY 3 entries i.e a1, a2, a3
-        Map<Address, Bucket> remoteBucketsInStore = store.getRemoteBuckets();
+        Map<Address, Bucket<?>> remoteBucketsInStore = store.getRemoteBuckets();
         Assert.assertFalse("remote buckets contains local bucket", remoteBucketsInStore.containsKey(localAddress));
         Assert.assertTrue(remoteBucketsInStore.size() == 3);
 
@@ -122,11 +105,9 @@ public class BucketStoreTest {
         Assert.assertTrue(remoteBucketsInStore.size() == 4);
 
         //Should update versions map
-        //versions map contains versions for all remote buckets (4) + local bucket
-        //so it should have total 5.
+        //versions map contains versions for all remote buckets (4).
         Map<Address, Long> versionsInStore = store.getVersions();
-        Assert.assertTrue(String.format("Expected:%s, Actual:%s", 5, versionsInStore.size()),
-                          versionsInStore.size() == 5);
+        Assert.assertEquals(4, versionsInStore.size());
         Assert.assertEquals(b1.getVersion(), versionsInStore.get(a1));
         Assert.assertEquals(b2.getVersion(), versionsInStore.get(a2));
         Assert.assertEquals(b3_new.getVersion(), versionsInStore.get(a3));
index 751a68965dc69af4963cf8bd1197cce9590f056d..6124bdf6422d80b3a06591af55af44b623472144 100644 (file)
@@ -24,6 +24,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
@@ -53,14 +54,47 @@ public interface StatisticsManager extends AutoCloseable, TransactionChainListen
      * Internal {@link TransactionChainListener} joining all DS commits
      * to Set of chained changes for prevent often DataStore touches.
      */
-    public interface StatDataStoreOperation {
+    public abstract class StatDataStoreOperation {
+        public enum StatsManagerOperationType {
+            /**
+             * Operation will carry out work related to new node addition /
+             * update
+             */
+            NODE_UPDATE,
+            /**
+             * Operation will carry out work related to node removal
+             */
+            NODE_REMOVAL,
+            /**
+             * Operation will commit data to the operational data store
+             */
+            DATA_COMMIT_OPER_DS
+        }
+
+        private NodeId nodeId;
+        private StatsManagerOperationType operationType = StatsManagerOperationType.DATA_COMMIT_OPER_DS;
+
+        public StatDataStoreOperation(final StatsManagerOperationType operType, final NodeId id){
+            if(operType != null){
+                operationType = operType;
+            }
+            nodeId = id;
+        }
+
+        public final StatsManagerOperationType getType() {
+            return operationType;
+        }
+
+        public final NodeId getNodeId(){
+            return nodeId;
+        }
 
         /**
-         * Apply all read / write (put|merge) operation
-         * for DataStore
+         * Apply all read / write (put|merge) operation for DataStore
+         *
          * @param {@link ReadWriteTransaction} tx
          */
-        void applyOperation(ReadWriteTransaction tx);
+        public abstract void applyOperation(ReadWriteTransaction tx);
 
     }
 
index e17c45dc767f5050c4113c4df96b2c6aaf170232..49fe3bbefd25cbb6e4c30646e8e23c67d904532a 100644 (file)
@@ -25,6 +25,7 @@ import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
 import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
 import org.opendaylight.controller.md.statistics.manager.impl.helper.FlowComparator;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
@@ -120,7 +121,7 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
             return;
         }
         /* check flow Capable Node and write statistics */
-        manager.enqueue(new StatDataStoreOperation() {
+        manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
             @Override
             public void applyOperation(final ReadWriteTransaction tx) {
 
@@ -179,7 +180,7 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
             return;
         }
         /* add flow's statistics */
-        manager.enqueue(new StatDataStoreOperation() {
+        manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
             @Override
             public void applyOperation(final ReadWriteTransaction tx) {
                 final Optional<TransactionCacheContainer<?>> txContainer = getTransactionCacheContainer(transId, nodeId);
@@ -218,6 +219,7 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
                 /* Notification for continue collecting statistics */
                 notifyToCollectNextStatistics(nodeIdent, transId);
             }
+
         });
     }
 
index 944ccfab5fd4433c37665037101d86e988a9b56d..538b9ef50565b8330e0960c0b192019fe9db959d 100644 (file)
@@ -20,6 +20,7 @@ import org.opendaylight.controller.md.statistics.manager.StatPermCollector.StatC
 import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
 import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
@@ -103,7 +104,7 @@ public class StatListenCommitGroup extends StatAbstractListenCommit<Group, Opend
         }
 
         /* Don't block RPC Notification thread */
-        manager.enqueue(new StatDataStoreOperation() {
+        manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
             @Override
             public void applyOperation(final ReadWriteTransaction tx) {
                 final InstanceIdentifier<Node> nodeIdent = InstanceIdentifier
@@ -157,7 +158,7 @@ public class StatListenCommitGroup extends StatAbstractListenCommit<Group, Opend
         }
 
         /* Don't block RPC Notification thread */
-        manager.enqueue(new StatDataStoreOperation() {
+        manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
             @Override
             public void applyOperation(final ReadWriteTransaction tx) {
                 /* Get and Validate TransactionCacheContainer */
@@ -211,7 +212,7 @@ public class StatListenCommitGroup extends StatAbstractListenCommit<Group, Opend
         }
 
         /* Don't block RPC Notification thread */
-        manager.enqueue(new StatDataStoreOperation() {
+        manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
             @Override
             public void applyOperation(final ReadWriteTransaction tx) {
 
index 2d5be71dcf914e41f0d8b9d61db9d5d7429ad67d..77d51c364adeeaf656275c1287757433095e9a6c 100644 (file)
@@ -20,6 +20,7 @@ import org.opendaylight.controller.md.statistics.manager.StatPermCollector.StatC
 import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
 import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
@@ -101,7 +102,7 @@ public class StatListenCommitMeter extends StatAbstractListenCommit<Meter, Opend
         }
 
         /* Don't block RPC Notification thread */
-        manager.enqueue(new StatDataStoreOperation() {
+        manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
             @Override
             public void applyOperation(final ReadWriteTransaction tx) {
 
@@ -157,7 +158,7 @@ public class StatListenCommitMeter extends StatAbstractListenCommit<Meter, Opend
         }
 
         /* Don't block RPC Notification thread */
-        manager.enqueue(new StatDataStoreOperation() {
+        manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
             @Override
             public void applyOperation(final ReadWriteTransaction tx) {
                 /* Get and Validate TransactionCacheContainer */
@@ -211,7 +212,7 @@ public class StatListenCommitMeter extends StatAbstractListenCommit<Meter, Opend
         }
 
         /* Don't block RPC Notification thread */
-        manager.enqueue(new StatDataStoreOperation() {
+        manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
             @Override
             public void applyOperation(final ReadWriteTransaction tx) {
 
index 77d7c7d312345a74451fc45c4c1f972ed23fd9c0..1bff3deba563d006e0890a16180cfb1b5602ec82 100644 (file)
@@ -21,6 +21,7 @@ import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
 import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
@@ -95,7 +96,7 @@ public class StatListenCommitQueue extends StatAbstractListenCommit<Queue, Opend
         }
 
         /* Don't block RPC Notification thread */
-        manager.enqueue(new StatDataStoreOperation() {
+        manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
             @Override
             public void applyOperation(final ReadWriteTransaction tx) {
 
index afb45e59f0a6c6effd7856e02bfbdc3e10a47116..4169725f4aec5256c5e3e6613b18cb0b2cb6b9ae 100644 (file)
@@ -23,6 +23,7 @@ import org.opendaylight.controller.md.statistics.manager.StatNodeRegistration;
 import org.opendaylight.controller.md.statistics.manager.StatPermCollector.StatCapabTypes;
 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
 import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FeatureCapability;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
@@ -40,6 +41,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRem
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
@@ -114,7 +116,8 @@ public class StatNodeRegistrationImpl implements StatNodeRegistration, DataChang
         Preconditions.checkNotNull(data, "SwitchFeatures data for {} can not be null!", keyIdent);
         Preconditions.checkArgument(( ! keyIdent.isWildcarded()), "InstanceIdentifier is WildCarded!");
 
-        manager.enqueue(new StatDataStoreOperation() {
+        manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.NODE_UPDATE,nodeIdent.firstKeyOf(Node.class, NodeKey.class).getId()) {
+
             @Override
             public void applyOperation(final ReadWriteTransaction tx) {
 
@@ -150,7 +153,8 @@ public class StatNodeRegistrationImpl implements StatNodeRegistration, DataChang
         Preconditions.checkArgument(nodeIdent != null, "InstanceIdentifier can not be NULL!");
         Preconditions.checkArgument(( ! nodeIdent.isWildcarded()),
                 "InstanceIdentifier {} is WildCarded!", nodeIdent);
-        manager.enqueue(new StatDataStoreOperation() {
+        manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.NODE_REMOVAL,nodeIdent.firstKeyOf(Node.class, NodeKey.class).getId()) {
+
             @Override
             public void applyOperation(final ReadWriteTransaction tx) {
                 manager.disconnectedNodeUnregistration(nodeIdent);
index 131de73f9ded30b75c1d039a9a9d5a9ab22def47..65b5df0d6107b69b3be2d07fd7a8358b5ca0ee2e 100644 (file)
@@ -17,6 +17,7 @@ import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
 import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
@@ -82,7 +83,7 @@ public class StatNotifyCommitPort extends StatAbstractNotifyCommit<OpendaylightP
         final InstanceIdentifier<Node> nodeIdent = InstanceIdentifier.create(Nodes.class)
                 .child(Node.class, new NodeKey(nodeId));
         /* Don't block RPC Notification thread */
-        manager.enqueue(new StatDataStoreOperation() {
+        manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
             @Override
             public void applyOperation(final ReadWriteTransaction trans) {
                 final Optional<TransactionCacheContainer<?>> txContainer = getTransactionCacheContainer(transId, nodeId);
index 53bca87034e263bf0b4b73eb07bb6579dbf63043..2d730645ace5b09089d68bd14c29e774af25a628 100644 (file)
@@ -17,6 +17,7 @@ import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
 import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
@@ -81,7 +82,7 @@ public class StatNotifyCommitTable extends StatAbstractNotifyCommit<Opendaylight
             return;
         }
         /* Don't block RPC Notification thread */
-        manager.enqueue(new StatDataStoreOperation() {
+        manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
             @Override
             public void applyOperation(final ReadWriteTransaction trans) {
                 final List<FlowTableAndStatisticsMap> tableStats = new ArrayList<FlowTableAndStatisticsMap>(10);
index 4870223c0ff4506e01cb9614b4ef209981ab85fd..20341bcc66e0b961f58e7c52b5122b35f38d3976 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.md.statistics.manager.impl;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -50,6 +51,7 @@ import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.cache.Cache;
@@ -174,13 +176,15 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
             @Override
             public void onSuccess(final RpcResult<? extends TransactionAware> result) {
                 final TransactionId id = result.getResult().getTransactionId();
+                final NodeKey nodeKey = nodeRef.getValue().firstKeyOf(Node.class, NodeKey.class);
                 if (id == null) {
-                    LOG.warn("No protocol support");
+                    String[] multipartRequestName = result.getResult().getClass().getSimpleName().split("(?=\\p{Upper})");
+                    LOG.warn("Node [{}] does not support statistics request type : {}",
+                            nodeKey.getId(),Joiner.on(" ").join(Arrays.copyOfRange(multipartRequestName, 2, multipartRequestName.length-2)));
                 } else {
                     if (resultTransId != null) {
                         resultTransId.set(id);
                     }
-                    final NodeKey nodeKey = nodeRef.getValue().firstKeyOf(Node.class, NodeKey.class);
                     final String cacheKey = buildCacheKey(id, nodeKey.getId());
                     final TransactionCacheContainer<? super TransactionAware> container =
                             new TransactionCacheContainerImpl<>(id, inputObj, nodeKey.getId());
index 1d03e38c165c0a51e3a257730792389abd4d42a5..437c92f6a09e27d1646a893c644bf688c0ad4af7 100644 (file)
@@ -29,6 +29,7 @@ import org.opendaylight.controller.md.statistics.manager.StatPermCollector;
 import org.opendaylight.controller.md.statistics.manager.StatPermCollector.StatCapabTypes;
 import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager;
 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
@@ -223,7 +224,18 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
    private synchronized void cleanDataStoreOperQueue() {
        // Drain all events, making sure any blocked threads are unblocked
        while (! dataStoreOperQueue.isEmpty()) {
-           dataStoreOperQueue.poll();
+           StatDataStoreOperation op = dataStoreOperQueue.poll();
+
+           // Execute the node removal clean up operation if queued in the
+           // operational queue.
+           if (op.getType() == StatsManagerOperationType.NODE_REMOVAL) {
+               try {
+                   LOG.debug("Node {} disconnected. Cleaning internal data.",op.getNodeId());
+                   op.applyOperation(null);
+               } catch (final Exception ex) {
+                   LOG.warn("Unhandled exception while cleaning up internal data of node [{}]",op.getNodeId());
+               }
+           }
        }
    }
 
diff --git a/pom.xml b/pom.xml
index 3bfffaa5d468f30444fa88482ac4b63a2a2787f1..1217d720663706fd12f4117a124a86d66d7dd8a7 100644 (file)
--- a/pom.xml
+++ b/pom.xml
     <module>opendaylight/commons/liblldp</module>
 
     <!-- Karaf Distribution -->
-    <module>opendaylight/karaf-branding</module>
-    <module>opendaylight/distribution/opendaylight-karaf-empty</module>
-    <module>opendaylight/distribution/opendaylight-karaf</module>
-    <module>opendaylight/distribution/opendaylight-karaf-resources</module>
+    <module>karaf</module>
     <module>features</module>
 
     <!-- archetypes -->