Implementation for enabling remote rpc calls between 2 instances of md-sal 59/3159/4
authorTony Tkacik <ttkacik@cisco.com>
Thu, 5 Dec 2013 20:51:46 +0000 (21:51 +0100)
committerTony Tkacik <ttkacik@cisco.com>
Sun, 8 Dec 2013 22:39:15 +0000 (23:39 +0100)
 - This provides implementation for enabling remote rpc calls between 2 instances of md-sal.
   The current implementation enables remote execution of globally unique services in the
   cluster. For details, please refer to this wiki page
   (https://wiki.opendaylight.org/view/Zeromq_connector). This wiki page is a draft.
 - Added relativePath in pom so that parent pom can be found.
 - Removed dependency to sal-infinispan-routingtable
 - Exported "impl" as well from zeromq-routingtable. Fixed dependencies in RouterTest.
 - Removed oss.sonatype release repo from md-sal pom. ODL nexus repo mirrors it.
 - Updated server code to handle exception
 - Server code now uses WB pattern instead of listerner pattern.
 - Fixed pom so that parent can be resolved
 - Rebased due to changed in unmerged dependency
 - Added state machine to RpcSocket.
 - Added unit tests to RpcSocketTest and SocketManagerTest.
 - Added CompositeNode methods to ExampleConsumer & XML files for creation of CompositeNodes
 - Added CompositeNode testcases to RouterTest
 - Translated scala code to java
 - Added code to convert CompositeNode to xml and back to help
 - with serialization.
 - Added more unit and integration tests.

This is squash for:

https://git.opendaylight.org/gerrit/2882
https://git.opendaylight.org/gerrit/3022
https://git.opendaylight.org/gerrit/3028
https://git.opendaylight.org/gerrit/3159

Change-Id: I44739fd8ad61043c2e786875bb7787e3fa68e435
Signed-off-by: Abhishek Kumar <abhishk2@cisco.com>
Signed-off-by: Tony Tkacik <ttkacik@cisco.com>
Signed-off-by: Alex Fan <railor33@gmail.com>
64 files changed:
opendaylight/distribution/opendaylight/pom.xml
opendaylight/md-sal/pom.xml
opendaylight/md-sal/sal-common-util/src/main/java/org/opendaylight/controller/sal/common/util/Rpcs.java
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/AbstractConsumer.java
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/AbstractProvider.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/BrokerConfigActivator.xtend
opendaylight/md-sal/sal-remoterpc-connector/implementation/pom.xml [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/config/yang/md/sal/remote/rpc/ZeroMQServerModule.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/config/yang/md/sal/remote/rpc/ZeroMQServerModuleFactory.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Client.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Context.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcClient.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcProvider.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcServer.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RoutingTableProvider.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RpcSocket.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Sender.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/SocketManager.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/SocketPair.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/CompositeNodeImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/Message.java [moved from opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/Message.java with 88% similarity]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/MessageWrapper.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/RouteIdentifierImpl.java [moved from opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/RouteIdentifierImpl.java with 54% similarity]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/RpcRequestImpl.java [moved from opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/RpcRequestImpl.java with 94% similarity]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/util/XmlUtils.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/scala/org/opendaylight/controller/sal/connector/remoterpc/Client.scala [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/yang/odl-sal-dom-rpc-remote-cfg.yang [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ClientTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/RouteIdentifierImplTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/RpcSocketTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/SerilizationTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/SocketManagerTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/resources/FourSimpleChildren.xml [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/pom.xml [moved from opendaylight/md-sal/test/zeromq-test-consumer/pom.xml with 87% similarity]
opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/java/org/opendaylight/controller/sample/zeromq/consumer/ExampleConsumer.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/FourSimpleChildren.xml [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/InvalidCompositeChild.xml [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/InvalidSimpleChild.xml [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/OneSimpleChild.xml [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/OneSimpleOneCompositeChild.xml [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/TwoCompositeChildren.xml [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/TwoSimpleChildren.xml [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/integrationtest/pom.xml [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/integrationtest/provider-service/pom.xml [moved from opendaylight/md-sal/test/zeromq-test-provider/pom.xml with 89% similarity]
opendaylight/md-sal/sal-remoterpc-connector/integrationtest/provider-service/src/main/java/org/opendaylight/controller/sample/zeromq/provider/ExampleProvider.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-it/pom.xml [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/RouterTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-it/src/test/resources/controller.config [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-it/src/test/resources/logback.xml [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-nb/pom.xml [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-nb/src/main/java/org/opendaylight/controller/tests/zmqrouter/rest/Router.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-nb/src/main/resources/WEB-INF/web.xml [new file with mode: 0644]
opendaylight/md-sal/sal-zeromq-connector/pom.xml [deleted file]
opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/api/RouteChange.java [deleted file]
opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/Activator.java [deleted file]
opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/ZeroMqRpcRouter.java [deleted file]
opendaylight/md-sal/test/pom.xml [deleted file]
opendaylight/md-sal/test/zeromq-test-consumer/src/main/java/org/opendaylight/controller/sample/zeromq/consumer/ExampleConsumer.java [deleted file]
opendaylight/md-sal/test/zeromq-test-it/pom.xml [deleted file]
opendaylight/md-sal/test/zeromq-test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/ServiceConsumerController.java [deleted file]
opendaylight/md-sal/test/zeromq-test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/ServiceProviderController.java [deleted file]
opendaylight/md-sal/test/zeromq-test-provider/src/main/java/org/opendaylight/controller/sample/zeromq/provider/ExampleProvider.java [deleted file]
opendaylight/md-sal/zeromq-routingtable/implementation/pom.xml

index adc0c09..4c0b81f 100644 (file)
          <groupId>org.opendaylight.controller.thirdparty</groupId>
          <artifactId>ganymed</artifactId>
         </dependency>
+        <dependency>
+          <groupId>org.opendaylight.controller</groupId>
+          <artifactId>sal-remoterpc-connector</artifactId>
+          <version>1.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+              <groupId>org.opendaylight.controller</groupId>
+              <artifactId>
+                  zeromq-routingtable.implementation
+              </artifactId>
+              <version>0.4.1-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+              <groupId>org.zeromq</groupId>
+              <artifactId>jeromq</artifactId>
+              <version>0.3.1</version>
+          </dependency>
       </dependencies>
     </profile>
     <profile>
index 0e78598..b34621d 100644 (file)
@@ -12,7 +12,7 @@
     </scm>
 
     <modules>
-        <!--  Common APIs & Implementation -->
+        <!-- Common APIs & Implementation -->
         <module>sal-common</module>
         <module>sal-common-api</module>
         <module>sal-common-impl</module>
         <module>sal-connector-api</module>
         <module>sal-rest-connector</module>
         <module>sal-netconf-connector</module>
-        
+
+        <module>zeromq-routingtable/implementation</module>
+        <module>sal-remoterpc-connector/implementation</module>
         <!-- Clustered Data Store -->
         <module>clustered-data-store/implementation</module>
 
         <module>inventory-manager</module>
         <module>statistics-manager</module>
         <module>forwardingrules-manager</module>
-        
+
         <!-- Compability Packages -->
         <module>compatibility</module>
-        <module>zeromq-routingtable/implementation</module>
-        <module>sal-zeromq-connector</module>
     </modules>
 
 
     <profiles>
         <profile>
-           <id>integrationtests</id>
-           <activation>
-               <activeByDefault>false</activeByDefault>
-           </activation>
+            <id>integrationtests</id>
+            <activation>
+                <activeByDefault>false</activeByDefault>
+            </activation>
             <modules>
                 <module>sal-binding-it</module>
-                <module>zeromq-routingtable/integrationtest</module>
                 <module>clustered-data-store/integrationtest</module>
-                <module>test</module>
+                <!--module>zeromq-routingtable/integrationtest</module -->
+                <!--module>sal-remoterpc-connector/integrationtest</module -->
+                <!--module>test/sal-rest-connector-it</modulei -->
             </modules>
         </profile>
         <profile>
-          <id>IDE</id>
-          <activation>
-            <property>
-              <name>m2e.version</name>
-            </property>
-          </activation>
-          <build>
-            <!-- Put the IDE's build output in a folder other than target, so that IDE builds don't interact with Maven builds -->
-            <directory>target-ide</directory>
-          </build>
+            <id>IDE</id>
+            <activation>
+                <property>
+                    <name>m2e.version</name>
+                </property>
+            </activation>
+            <build>
+                <!-- Put the IDE's build output in a folder other than target, 
+                    so that IDE builds don't interact with Maven builds -->
+                <directory>target-ide</directory>
+            </build>
         </profile>
     </profiles>
 
         <guava.version>14.0.1</guava.version>
         <osgi.core.version>5.0.0</osgi.core.version>
         <junit.version>4.8.1</junit.version>
+        <powermock.version>1.5.1</powermock.version>
+        <mockito.version>1.9.5</mockito.version>
         <xtend.version>2.4.3</xtend.version>
         <maven.clean.plugin.version>2.5</maven.clean.plugin.version>
         <jacoco.version>0.5.3.201107060350</jacoco.version>
+        <sal.version>0.5.1-SNAPSHOT</sal.version>  <!-- AD Sal version -->
+
         <!-- Sonar properties using jacoco to retrieve integration test results -->
         <sonar.java.coveragePlugin>jacoco</sonar.java.coveragePlugin>
         <sonar.dynamicAnalysis>reuseReports</sonar.dynamicAnalysis>
     <pluginRepositories>
         <!-- OpenDayLight Repo Mirror -->
         <pluginRepository>
-          <id>opendaylight-mirror</id>
-          <name>opendaylight-mirror</name>
-          <url>${nexusproxy}/groups/public/</url>
-          <snapshots>
-              <enabled>false</enabled>
-          </snapshots>
-          <releases>
-              <enabled>true</enabled>
-              <updatePolicy>never</updatePolicy>
-          </releases>
+            <id>opendaylight-mirror</id>
+            <name>opendaylight-mirror</name>
+            <url>${nexusproxy}/groups/public/</url>
+            <snapshots>
+                <enabled>false</enabled>
+            </snapshots>
+            <releases>
+                <enabled>true</enabled>
+                <updatePolicy>never</updatePolicy>
+            </releases>
         </pluginRepository>
         <!-- OpenDayLight Snapshot artifact -->
         <pluginRepository>
-          <id>opendaylight-snapshot</id>
-          <name>opendaylight-snapshot</name>
-          <url> ${nexusproxy}/repositories/opendaylight.snapshot/</url>
-          <snapshots>
-              <enabled>true</enabled>
-          </snapshots>
-          <releases>
-              <enabled>false</enabled>
-          </releases>
+            <id>opendaylight-snapshot</id>
+            <name>opendaylight-snapshot</name>
+            <url> ${nexusproxy}/repositories/opendaylight.snapshot/</url>
+            <snapshots>
+                <enabled>true</enabled>
+            </snapshots>
+            <releases>
+                <enabled>false</enabled>
+            </releases>
         </pluginRepository>
     </pluginRepositories>
 
     <repositories>
         <!-- OpenDayLight Repo Mirror -->
         <repository>
-          <id>opendaylight-mirror</id>
-          <name>opendaylight-mirror</name>
-          <url>${nexusproxy}/groups/public/</url>
-          <snapshots>
-              <enabled>false</enabled>
-          </snapshots>
-          <releases>
-              <enabled>true</enabled>
-              <updatePolicy>never</updatePolicy>
-          </releases>
+            <id>opendaylight-mirror</id>
+            <name>opendaylight-mirror</name>
+            <url>${nexusproxy}/groups/public/</url>
+            <snapshots>
+                <enabled>false</enabled>
+            </snapshots>
+            <releases>
+                <enabled>true</enabled>
+                <updatePolicy>never</updatePolicy>
+            </releases>
         </repository>
         <!-- OpenDayLight Snapshot artifact -->
         <repository>
-          <id>opendaylight-snapshot</id>
-          <name>opendaylight-snapshot</name>
-          <url> ${nexusproxy}/repositories/opendaylight.snapshot/</url>
-          <snapshots>
-              <enabled>true</enabled>
-          </snapshots>
-          <releases>
-              <enabled>false</enabled>
-          </releases>
+            <id>opendaylight-snapshot</id>
+            <name>opendaylight-snapshot</name>
+            <url> ${nexusproxy}/repositories/opendaylight.snapshot/</url>
+            <snapshots>
+                <enabled>true</enabled>
+            </snapshots>
+            <releases>
+                <enabled>false</enabled>
+            </releases>
         </repository>
     </repositories>
 
                 <artifactId>yang-data-api</artifactId>
                 <version>${yang.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.opendaylight.yangtools</groupId>
+                <artifactId>yang-data-impl</artifactId>
+                <version>${yang.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.opendaylight.yangtools</groupId>
                 <artifactId>yang-model-api</artifactId>
                 <artifactId>sal-connector-api</artifactId>
                 <version>${project.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.opendaylight.controller</groupId>
+                <artifactId>sal</artifactId>
+                <version>${sal.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.osgi</groupId>
+                        <artifactId>org.osgi.compendium</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
 
             <!-- Supporting Libraries -->
             <dependency>
                 <artifactId>org.eclipse.xtend.lib</artifactId>
                 <version>${xtend.version}</version>
             </dependency>
-
+            <dependency>
+                <groupId>org.osgi</groupId>
+                <artifactId>org.osgi.core</artifactId>
+                <version>${osgi.core.version}</version>
+            </dependency>
             <!-- Testing Dependencies -->
             <dependency>
                 <groupId>junit</groupId>
             <dependency>
                 <groupId>org.mockito</groupId>
                 <artifactId>mockito-all</artifactId>
-                <version>1.9.5</version>
+                <version>${mockito.version}</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.powermock</groupId>
+                <artifactId>powermock-module-junit4</artifactId>
+                <version>${powermock.version}</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.powermock</groupId>
+                <artifactId>powermock-api-mockito</artifactId>
+                <version>${powermock.version}</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.powermock</groupId>
+                <artifactId>powermock-core</artifactId>
+                <version>${powermock.version}</version>
                 <scope>test</scope>
             </dependency>
         </dependencies>
                     <artifactId>maven-bundle-plugin</artifactId>
                     <version>${bundle.plugin.version}</version>
                     <extensions>true</extensions>
-                    <!--executions>
-                        <execution>
-                            <id>bundle-manifest</id>
-                            <phase>process-classes</phase>
-                            <goals>
-                                <goal>manifest</goal>
-                            </goals>
-                        </execution>
-                    </executions-->
+                    <!--executions> <execution> <id>bundle-manifest</id> 
+                        <phase>process-classes</phase> <goals> <goal>manifest</goal> </goals> </execution> 
+                        </executions -->
                     <configuration>
                         <instructions>
                             <Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>
                     <artifactId>jacoco-maven-plugin</artifactId>
                     <version>${jacoco.version}</version>
                 </plugin>
-                <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+                <!--This plugin's configuration is used to store Eclipse 
+                    m2e settings only. It has no influence on the Maven build itself. -->
                 <plugin>
                     <groupId>org.eclipse.m2e</groupId>
                     <artifactId>lifecycle-mapping</artifactId>
                                         </goals>
                                     </pluginExecutionFilter>
                                     <action>
-                                        <ignore/>
+                                        <ignore />
                                     </action>
                                 </pluginExecution>
                                 <pluginExecution>
                                         </goals>
                                     </pluginExecutionFilter>
                                     <action>
-                                      <ignore/>
+                                        <ignore />
                                     </action>
                                 </pluginExecution>
                                 <pluginExecution>
                                         </goals>
                                     </pluginExecutionFilter>
                                     <action>
-                                        <ignore/>
+                                        <ignore />
                                     </action>
                                 </pluginExecution>
                             </pluginExecutions>
index e46b566..54e1a06 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.sal.common.util;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -20,7 +21,7 @@ public class Rpcs {
         return ret;
     }
 
-    private static class RpcResultTO<T> implements RpcResult<T> {
+    private static class RpcResultTO<T> implements RpcResult<T>, Serializable {
 
         private final Collection<RpcError> errors;
         private final T result;
index 1fb73bc..99a38ca 100644 (file)
@@ -13,17 +13,24 @@ import java.util.Collections;
 import org.osgi.framework.BundleActivator;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceReference;
+import org.osgi.util.tracker.ServiceTracker;
+import org.osgi.util.tracker.ServiceTrackerCustomizer;
 
-public abstract class AbstractConsumer implements Consumer, BundleActivator {
+public abstract class AbstractConsumer implements Consumer, BundleActivator,ServiceTrackerCustomizer<Broker, Broker> {
+
+    
+    
+    
+    private BundleContext context;
+    private ServiceTracker<Broker, Broker> tracker;
+    private Broker broker;
 
-    Broker broker;
-    ServiceReference<Broker> brokerRef;
     @Override
     public final void start(BundleContext context) throws Exception {
+        this.context = context;
         this.startImpl(context);
-        brokerRef = context.getServiceReference(Broker.class);
-        broker = context.getService(brokerRef);
-        broker.registerConsumer(this,context);
+        tracker = new ServiceTracker<>(context, Broker.class, this);
+        tracker.open();
     }
 
 
@@ -32,9 +39,7 @@ public abstract class AbstractConsumer implements Consumer, BundleActivator {
     public final void stop(BundleContext context) throws Exception {
         stopImpl(context);
         broker = null;
-        if(brokerRef != null) {
-            context.ungetService(brokerRef);
-        }
+        tracker.close();
     }
 
     protected void startImpl(BundleContext context) {
@@ -49,4 +54,25 @@ public abstract class AbstractConsumer implements Consumer, BundleActivator {
         return Collections.emptySet();
     }
 
+    
+    @Override
+    public Broker addingService(ServiceReference<Broker> reference) {
+        if(broker == null) {
+            broker = context.getService(reference);
+            broker.registerConsumer(this, context);
+            return broker;
+        }
+        
+        return null;
+    }
+    
+    @Override
+    public void modifiedService(ServiceReference<Broker> reference, Broker service) {
+        // NOOP
+    }
+    
+    @Override
+    public void removedService(ServiceReference<Broker> reference, Broker service) {
+        stopImpl(context);
+    }
 }
index 621ef92..1cb1a2b 100644 (file)
@@ -10,16 +10,20 @@ package org.opendaylight.controller.sal.core.api;
 import java.util.Collection;
 import java.util.Collections;
 
+import javax.naming.Context;
+
 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
 import org.osgi.framework.BundleActivator;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceReference;
+import org.osgi.util.tracker.ServiceTracker;
+import org.osgi.util.tracker.ServiceTrackerCustomizer;
 
-public abstract class AbstractProvider implements BundleActivator, Provider {
+public abstract class AbstractProvider implements BundleActivator, Provider,ServiceTrackerCustomizer<Broker, Broker> {
 
-    private ServiceReference<Broker> brokerRef;
     private Broker broker;
-
+    private BundleContext context;
+    private ServiceTracker<Broker, Broker> tracker;
     @Override
     public Collection<ProviderFunctionality> getProviderFunctionality() {
         return Collections.emptySet();
@@ -27,12 +31,10 @@ public abstract class AbstractProvider implements BundleActivator, Provider {
 
     @Override
     public final void start(BundleContext context) throws Exception {
-        brokerRef = context.getServiceReference(Broker.class);
-        broker = context.getService(brokerRef);
-
+        this.context = context;
         this.startImpl(context);
-
-        broker.registerProvider(this,context);
+        tracker = new ServiceTracker<>(context, Broker.class, this);
+        tracker.open();
     }
 
     protected void startImpl(BundleContext context) {
@@ -44,7 +46,31 @@ public abstract class AbstractProvider implements BundleActivator, Provider {
 
     @Override
     public final void stop(BundleContext context) throws Exception {
+        broker = null;
+        tracker.close();
+        tracker = null;
         stopImpl(context);
     }
 
+    @Override
+    public Broker addingService(ServiceReference<Broker> reference) {
+        if(broker == null) {
+            broker = context.getService(reference);
+            broker.registerProvider(this, context);
+            return broker;
+        }
+        
+        return null;
+    }
+    
+    @Override
+    public void modifiedService(ServiceReference<Broker> reference, Broker service) {
+        // NOOP
+    }
+    
+    @Override
+    public void removedService(ServiceReference<Broker> reference, Broker service) {
+        stopImpl(context);
+    }
+    
 }
index 482cfa9..dc116ca 100644 (file)
@@ -14,6 +14,7 @@ import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
 import org.opendaylight.controller.sal.core.api.data.DataStore
 import org.opendaylight.controller.sal.dom.broker.impl.SchemaAwareDataStoreAdapter
 import org.opendaylight.controller.sal.core.api.model.SchemaServiceListener
+import org.opendaylight.controller.sal.dom.broker.impl.RpcRouterImpl
 
 class BrokerConfigActivator implements AutoCloseable {
     
@@ -37,7 +38,7 @@ class BrokerConfigActivator implements AutoCloseable {
         val emptyProperties = new Hashtable<String, String>();
         broker.setBundleContext(context);
         
-
+        broker.setRouter(new RpcRouterImpl("Rpc router"))
         schemaService = new SchemaServiceImpl();
         schemaService.setContext(context);
         schemaService.setParser(new YangParserImpl());
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/pom.xml b/opendaylight/md-sal/sal-remoterpc-connector/implementation/pom.xml
new file mode 100644 (file)
index 0000000..b8e0938
--- /dev/null
@@ -0,0 +1,199 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>sal-parent</artifactId>
+        <relativePath>../..</relativePath>
+        <version>1.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>sal-remoterpc-connector</artifactId>
+    <packaging>bundle</packaging>
+
+  <properties>
+    <zeromq.version>0.3.1</zeromq.version>
+    <jackson.version>1.9.8</jackson.version>
+    <stax.version>1.0.1</stax.version>
+  </properties>
+
+  <dependencies>
+    <!-- MD Sal interdependencies -->
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>sal-core-api</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>sal-connector-api</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>sal-common-util</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>zeromq-routingtable.implementation</artifactId>
+      <!-- TODO: fix the version. Why is it not MD Sal project version?-->
+      <version>0.4.1-SNAPSHOT</version>
+    </dependency>
+
+    <!-- AD Sal -->
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal</artifactId>
+    </dependency>
+
+    <!-- Yang tools -->
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>yang-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>yang-data-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>yang-data-impl</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>yang-common</artifactId>
+    </dependency>
+
+    <!-- Third Party -->
+    <dependency>
+      <groupId>org.osgi</groupId>
+      <artifactId>org.osgi.core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.zeromq</groupId>
+      <artifactId>jeromq</artifactId>
+      <version>${zeromq.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId> org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-core-asl</artifactId>
+      <version>${jackson.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-mapper-asl</artifactId>
+      <version>${jackson.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>stax</groupId>
+      <artifactId>stax-api</artifactId>
+      <version>${stax.version}</version>
+    </dependency>
+
+    <!-- Tests -->
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-module-junit4</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-api-mockito</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-core</artifactId>
+    </dependency>
+
+  </dependencies>
+
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <version>${bundle.plugin.version}</version>
+                <extensions>true</extensions>
+                <configuration>
+                    <instructions>
+                        <Import-Package>
+                            *,
+                            !org.codehaus.enunciate.jaxrs
+                        </Import-Package>
+                        <Export-Package>
+                            org.opendaylight.controller.config.yang.md.sal.remote.rpc,
+                            org.opendaylight.controller.sal.connector.remoterpc,
+                            org.opendaylight.controller.sal.connector.remoterpc.*
+                        </Export-Package>
+                        <Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>
+                    </instructions>
+                </configuration>
+            </plugin>
+           
+
+            <plugin>
+                <groupId>org.opendaylight.yangtools</groupId>
+                <artifactId>yang-maven-plugin</artifactId>
+                <version>0.5.9-SNAPSHOT</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>generate-sources</goal>
+                        </goals>
+                        <configuration>
+                            <codeGenerators>
+                                <generator>
+                                    <codeGeneratorClass>
+                                        org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator
+                                    </codeGeneratorClass>
+                                    <outputBaseDir>${project.build.directory}/generated-sources/config</outputBaseDir>
+                                    <additionalConfiguration>
+                                        <namespaceToPackage1>
+                                            urn:opendaylight:params:xml:ns:yang:controller==org.opendaylight.controller.config.yang
+                                        </namespaceToPackage1>
+                                    </additionalConfiguration>
+                                </generator>
+                                <generator>
+                                    <codeGeneratorClass>org.opendaylight.yangtools.yang.unified.doc.generator.maven.DocumentationGeneratorImpl</codeGeneratorClass>
+                                    <outputBaseDir>target/site/models</outputBaseDir>
+                                </generator>
+                            </codeGenerators>
+                            <inspectDependencies>true</inspectDependencies>
+                        </configuration>
+                    </execution>
+                </executions>
+                <dependencies>
+                    <dependency>
+                        <groupId>org.opendaylight.controller</groupId>
+                        <artifactId>yang-jmx-generator-plugin</artifactId>
+                        <version>0.2.3-SNAPSHOT</version>
+                    </dependency>
+                    <dependency>
+                        <groupId>org.opendaylight.yangtools</groupId>
+                        <artifactId>maven-sal-api-gen-plugin</artifactId>
+                        <version>0.6.0-SNAPSHOT</version>
+                        <type>jar</type>
+                    </dependency>
+                </dependencies>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/config/yang/md/sal/remote/rpc/ZeroMQServerModule.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/config/yang/md/sal/remote/rpc/ZeroMQServerModule.java
new file mode 100644 (file)
index 0000000..606f282
--- /dev/null
@@ -0,0 +1,66 @@
+/**
+* Generated file
+
+* Generated from: yang module name: odl-sal-dom-rpc-remote-cfg  yang module local name: remote-zeromq-rpc-server
+* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator
+* Generated at: Thu Dec 05 14:25:21 CET 2013
+*
+* Do not modify this file unless it is present under src/main directory
+*/
+package org.opendaylight.controller.config.yang.md.sal.remote.rpc;
+
+import org.opendaylight.controller.sal.connector.remoterpc.Client;
+import org.opendaylight.controller.sal.connector.remoterpc.RemoteRpcProvider;
+import org.opendaylight.controller.sal.connector.remoterpc.RoutingTableProvider;
+import org.opendaylight.controller.sal.connector.remoterpc.ServerImpl;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.osgi.framework.BundleContext;
+
+/**
+*
+*/
+public final class ZeroMQServerModule extends org.opendaylight.controller.config.yang.md.sal.remote.rpc.AbstractZeroMQServerModule
+ {
+
+    private static final Integer ZEROMQ_ROUTER_PORT = 5554;
+    private BundleContext bundleContext;
+
+    public ZeroMQServerModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
+        super(identifier, dependencyResolver);
+    }
+
+    public ZeroMQServerModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver,
+            ZeroMQServerModule oldModule, java.lang.AutoCloseable oldInstance) {
+
+        super(identifier, dependencyResolver, oldModule, oldInstance);
+    }
+
+    @Override
+    protected void customValidation(){
+        // Add custom validation for module attributes here.
+    }
+
+    @Override
+    public java.lang.AutoCloseable createInstance() {
+        
+        Broker broker = getDomBrokerDependency();
+        RoutingTableProvider provider = new RoutingTableProvider(bundleContext);
+        
+        
+        final int port = getPort() != null ? getPort() : ZEROMQ_ROUTER_PORT;
+
+        ServerImpl serverImpl = new ServerImpl(port);
+        
+        Client clientImpl = new Client();
+        RemoteRpcProvider facade = new RemoteRpcProvider(serverImpl, clientImpl);
+        
+        facade.setRoutingTableProvider(provider );
+        
+        broker.registerProvider(facade, bundleContext);
+        return facade;
+    }
+
+    public void setBundleContext(BundleContext bundleContext) {
+        this.bundleContext = bundleContext;
+    }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/config/yang/md/sal/remote/rpc/ZeroMQServerModuleFactory.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/config/yang/md/sal/remote/rpc/ZeroMQServerModuleFactory.java
new file mode 100644 (file)
index 0000000..3cc3ac0
--- /dev/null
@@ -0,0 +1,37 @@
+/**
+* Generated file
+
+* Generated from: yang module name: odl-sal-dom-rpc-remote-cfg  yang module local name: remote-zeromq-rpc-server
+* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator
+* Generated at: Thu Dec 05 14:25:21 CET 2013
+*
+* Do not modify this file unless it is present under src/main directory
+*/
+package org.opendaylight.controller.config.yang.md.sal.remote.rpc;
+
+import org.opendaylight.controller.config.api.DependencyResolver;
+import org.opendaylight.controller.config.api.DynamicMBeanWithInstance;
+import org.opendaylight.controller.config.spi.Module;
+import org.osgi.framework.BundleContext;
+
+/**
+*
+*/
+public class ZeroMQServerModuleFactory extends org.opendaylight.controller.config.yang.md.sal.remote.rpc.AbstractZeroMQServerModuleFactory
+{
+
+    @Override
+    public Module createModule(String instanceName, DependencyResolver dependencyResolver, BundleContext bundleContext) {
+        ZeroMQServerModule module = (ZeroMQServerModule) super.createModule(instanceName, dependencyResolver, bundleContext);
+        module.setBundleContext(bundleContext);
+        return module;
+    }
+    
+    @Override
+    public Module createModule(String instanceName, DependencyResolver dependencyResolver,
+            DynamicMBeanWithInstance old, BundleContext bundleContext) throws Exception {
+        ZeroMQServerModule module = (ZeroMQServerModule) super.createModule(instanceName, dependencyResolver, old,bundleContext);
+        module.setBundleContext(bundleContext);
+        return module;
+    }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Client.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Client.java
new file mode 100644 (file)
index 0000000..ef31623
--- /dev/null
@@ -0,0 +1,188 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import com.google.common.base.Optional;
+
+import org.opendaylight.controller.sal.common.util.RpcErrors;
+import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
+import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.ZMQ;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.*;
+
+import static com.google.common.base.Preconditions.*;
+
+/**
+ * An implementation of {@link RpcImplementation} that makes remote RPC calls
+ */
+public class Client implements RemoteRpcClient {
+
+    private final Logger _logger = LoggerFactory.getLogger(Client.class);
+
+    private final LinkedBlockingQueue<MessageWrapper> requestQueue = new LinkedBlockingQueue<MessageWrapper>(100);
+
+    private final ExecutorService pool = Executors.newSingleThreadExecutor();
+    private final long TIMEOUT = 5000; // in ms
+
+    private  RoutingTableProvider routingTableProvider;
+
+    public RoutingTableProvider getRoutingTableProvider() {
+        return routingTableProvider;
+    }
+
+    public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
+        this.routingTableProvider = routingTableProvider;
+    }
+
+    public LinkedBlockingQueue<MessageWrapper> getRequestQueue() {
+        return requestQueue;
+    }
+
+    @Override
+    public Set<QName> getSupportedRpcs() {
+        // TODO: Find the entries from routing table
+        return Collections.emptySet();
+    }
+
+    public void start() {
+        pool.execute(new Sender(this));
+
+    }
+
+    public void stop() {
+
+        _logger.debug("Client stopping...");
+        Context.getInstance().getZmqContext().term();
+        _logger.debug("ZMQ context terminated");
+
+        pool.shutdown(); // intiate shutdown
+        try {
+            if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+                pool.shutdownNow();
+                if (!pool.awaitTermination(10, TimeUnit.SECONDS))
+                    _logger.error("Client thread pool did not shut down");
+            }
+        } catch (InterruptedException e) {
+            // (Re-)Cancel if current thread also interrupted
+            pool.shutdownNow();
+            // Preserve interrupt status
+            Thread.currentThread().interrupt();
+        }
+        _logger.debug("Client stopped");
+    }
+
+    @Override
+    public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+
+        RouteIdentifierImpl routeId = new RouteIdentifierImpl();
+        routeId.setType(rpc);
+
+        String address = lookupRemoteAddress(routeId);
+
+        Message request = new Message.MessageBuilder().type(Message.MessageType.REQUEST)
+                .sender(Context.getInstance().getLocalUri()).recipient(address).route(routeId)
+                .payload(XmlUtils.compositeNodeToXml(input)).build();
+
+        List<RpcError> errors = new ArrayList<RpcError>();
+
+        try (SocketPair pair = new SocketPair()) {
+
+            MessageWrapper messageWrapper = new MessageWrapper(request, pair.getSender());
+            process(messageWrapper);
+            Message response = parseMessage(pair.getReceiver());
+
+            CompositeNode payload = XmlUtils.xmlToCompositeNode((String) response.getPayload());
+
+            return Rpcs.getRpcResult(true, payload, errors);
+
+        } catch (Exception e) {
+            collectErrors(e, errors);
+            return Rpcs.getRpcResult(false, null, errors);
+        }
+
+    }
+
+    public void process(MessageWrapper msg) throws TimeoutException, InterruptedException {
+        _logger.debug("Processing message [{}]", msg);
+
+        boolean success = requestQueue.offer(msg, TIMEOUT, TimeUnit.MILLISECONDS);
+        if (!success)
+            throw new TimeoutException("Queue is full");
+    }
+
+    /**
+     * Block on socket for reply
+     * 
+     * @param receiver
+     * @return
+     */
+    private Message parseMessage(ZMQ.Socket receiver) throws IOException, ClassNotFoundException {
+        return (Message) Message.deserialize(receiver.recv());
+    }
+
+    /**
+     * Find address for the given route identifier in routing table
+     * 
+     * @param routeId
+     *            route identifier
+     * @return remote network address
+     */
+    private String lookupRemoteAddress(RpcRouter.RouteIdentifier routeId) {
+        checkNotNull(routeId, "route must not be null");
+
+        Optional<RoutingTable<String, String>> routingTable = routingTableProvider.getRoutingTable();
+        checkNotNull(routingTable.isPresent(), "Routing table is null");
+
+        Set<String> addresses = routingTable.get().getRoutes(routeId.toString());
+        checkNotNull(addresses, "Address not found for route [%s]", routeId);
+        checkState(addresses.size() == 1, "Multiple remote addresses found for route [%s], \nonly 1 expected", routeId); // its
+                                                                                                                         // a
+                                                                                                                         // global
+                                                                                                                         // service.
+
+        String address = addresses.iterator().next();
+        checkNotNull(address, "Address not found for route [%s]", routeId);
+
+        return address;
+    }
+
+    private void collectErrors(Exception e, List<RpcError> errors) {
+        if (e == null)
+            return;
+        if (errors == null)
+            errors = new ArrayList<RpcError>();
+
+        errors.add(RpcErrors.getRpcError(null, null, null, null, e.getMessage(), null, e.getCause()));
+        for (Throwable t : e.getSuppressed()) {
+            errors.add(RpcErrors.getRpcError(null, null, null, null, t.getMessage(), null, t));
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        stop();
+    }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Context.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Context.java
new file mode 100644 (file)
index 0000000..f0bf12c
--- /dev/null
@@ -0,0 +1,91 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import org.zeromq.ZMQ;
+
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.Enumeration;
+
+/**
+ * Provides a ZeroMQ Context object
+ */
+public class Context {
+  private ZMQ.Context zmqContext = ZMQ.context(1);
+  private String uri;
+
+  private static Context _instance = new Context();
+
+  private Context() {}
+
+  public static Context getInstance(){
+    return _instance;
+  }
+
+  public ZMQ.Context getZmqContext(){
+    return this.zmqContext;
+  }
+
+  public String getLocalUri(){
+    uri = (uri != null) ? uri
+            : new StringBuilder("tcp://").append(getIpAddress()).append(":")
+              .append(getRpcPort()).toString();
+
+    return uri;
+  }
+
+  public String getRpcPort(){
+    String rpcPort = (System.getProperty("rpc.port") != null)
+        ? System.getProperty("rpc.port")
+        : "5554";
+
+    return rpcPort;
+  }
+
+  private String getIpAddress(){
+    String ipAddress = (System.getProperty("local.ip") != null)
+        ? System.getProperty("local.ip")
+        : findIpAddress();
+
+    return ipAddress;
+  }
+
+  /**
+   * Finds IPv4 address of the local VM
+   * TODO: This method is non-deterministic. There may be more than one IPv4 address. Cant say which
+   * address will be returned. Read IP from a property file or enhance the code to make it deterministic.
+   * Should we use IP or hostname?
+   *
+   * @return
+   */
+  private String findIpAddress() {
+    String hostAddress = null;
+    Enumeration e = null;
+    try {
+      e = NetworkInterface.getNetworkInterfaces();
+    } catch (SocketException e1) {
+      e1.printStackTrace();
+    }
+    while (e.hasMoreElements()) {
+
+      NetworkInterface n = (NetworkInterface) e.nextElement();
+
+      Enumeration ee = n.getInetAddresses();
+      while (ee.hasMoreElements()) {
+        InetAddress i = (InetAddress) ee.nextElement();
+        if ((i instanceof Inet4Address) && (i.isSiteLocalAddress()))
+          hostAddress = i.getHostAddress();
+      }
+    }
+    return hostAddress;
+
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcClient.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcClient.java
new file mode 100644 (file)
index 0000000..6bd123b
--- /dev/null
@@ -0,0 +1,13 @@
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+
+public interface RemoteRpcClient extends RpcImplementation,AutoCloseable{
+
+
+    void setRoutingTableProvider(RoutingTableProvider provider);
+    
+    void stop();
+    
+    void start();
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcProvider.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcProvider.java
new file mode 100644 (file)
index 0000000..3c2e3b0
--- /dev/null
@@ -0,0 +1,95 @@
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import org.opendaylight.controller.sal.connector.remoterpc.Client;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
+import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.controller.sal.core.api.Provider;
+import org.opendaylight.controller.sal.core.api.Provider.ProviderFunctionality;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+
+public class RemoteRpcProvider implements 
+    RemoteRpcServer,
+    RemoteRpcClient,
+    Provider {
+
+    private final ServerImpl server;
+    private final Client client;
+    private RoutingTableProvider provider;
+
+    @Override
+    public void setRoutingTableProvider(RoutingTableProvider provider) {
+        this.provider = provider;
+        server.setRoutingTableProvider(provider);
+        client.setRoutingTableProvider(provider);
+    }
+    
+    @Override
+    public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+        return client.invokeRpc(rpc, input);
+    }
+    
+    @Override
+    public Set<QName> getSupportedRpcs() {
+        return client.getSupportedRpcs();
+    }
+    
+    
+    public RemoteRpcProvider(ServerImpl server, Client client) {
+        this.server = server;
+        this.client = client;
+    }
+    
+    public void setBrokerSession(ProviderSession session) {
+        server.setBrokerSession(session);
+    }
+    public void setServerPool(ExecutorService serverPool) {
+        server.setServerPool(serverPool);
+    }
+    public void start() {
+        client.setRoutingTableProvider(provider);
+        server.setRoutingTableProvider(provider);
+        server.start();
+        client.start();
+    }
+    public void onRouteUpdated(String key, Set values) {
+        server.onRouteUpdated(key, values);
+    }
+    public void onRouteDeleted(String key) {
+        server.onRouteDeleted(key);
+    }
+    
+    
+    @Override
+    public Collection<ProviderFunctionality> getProviderFunctionality() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+    
+    
+    @Override
+    public void onSessionInitiated(ProviderSession session) {
+        server.setBrokerSession(session);
+        start();
+    }
+    
+    
+    public void close() throws Exception {
+        server.close();
+        client.close();
+    }
+
+    
+    
+    
+    @Override
+    public void stop() {
+        server.stop();
+        client.stop();
+    }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcServer.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcServer.java
new file mode 100644 (file)
index 0000000..932600f
--- /dev/null
@@ -0,0 +1,6 @@
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+
+public interface RemoteRpcServer extends AutoCloseable {
+
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RoutingTableProvider.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RoutingTableProvider.java
new file mode 100644 (file)
index 0000000..cfdf986
--- /dev/null
@@ -0,0 +1,32 @@
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
+import org.osgi.framework.BundleContext;
+import org.osgi.util.tracker.ServiceTracker;
+
+import com.google.common.base.Optional;
+
+public class RoutingTableProvider implements AutoCloseable {
+
+    @SuppressWarnings("rawtypes")
+    final ServiceTracker<RoutingTable,RoutingTable> tracker;
+    
+    
+    public RoutingTableProvider(BundleContext ctx) {
+        @SuppressWarnings("rawtypes")
+        ServiceTracker<RoutingTable, RoutingTable> rawTracker = new ServiceTracker<>(ctx, RoutingTable.class, null);
+        tracker = rawTracker;
+        tracker.open();
+    }
+    
+    public Optional<RoutingTable<String, String>> getRoutingTable() {
+        @SuppressWarnings("unchecked")
+        RoutingTable<String,String> tracked = tracker.getService();
+        return Optional.fromNullable(tracked);
+    }
+
+    @Override
+    public void close() throws Exception {
+        tracker.close();
+    }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RpcSocket.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RpcSocket.java
new file mode 100644 (file)
index 0000000..7e8590a
--- /dev/null
@@ -0,0 +1,221 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.ZMQ;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A class encapsulating {@link ZMQ.Socket} of type {@link ZMQ.REQ}.
+ * It adds following capabilities:
+ * <li> Retry logic - Tries 3 times before giving up
+ * <li> Request times out after {@link TIMEOUT} property
+ * <li> The limitation of {@link ZMQ.REQ}/{@link ZMQ.REP} pair is that no 2 requests can be sent before
+ * the response for the 1st request is received. To overcome that, this socket queues all messages until
+ * the previous request has been responded.
+ */
+public class RpcSocket {
+
+  // Constants
+  public static final int TIMEOUT = 2000;
+  public static final int QUEUE_SIZE = 10;
+  public static final int NUM_RETRIES = 3;
+  private static final Logger log = LoggerFactory.getLogger(RpcSocket.class);
+
+  private ZMQ.Socket socket;
+  private ZMQ.Poller poller;
+  private String address;
+  private SocketState state;
+  private long sendTime;
+  private int retriesLeft;
+  private LinkedBlockingQueue<MessageWrapper> inQueue;
+
+
+  public RpcSocket(String address, ZMQ.Poller poller) {
+    this.socket = null;
+    this.state = new IdleSocketState();
+    this.sendTime = -1;
+    this.retriesLeft = NUM_RETRIES;
+    this.inQueue = new LinkedBlockingQueue<MessageWrapper>(QUEUE_SIZE);
+    this.address = address;
+    this.poller = poller;
+    createSocket();
+  }
+
+  public ZMQ.Socket getSocket() {
+    return socket;
+  }
+
+  public String getAddress() {
+    return address;
+  }
+
+  public int getRetriesLeft() {
+    return retriesLeft;
+  }
+
+  public void setRetriesLeft(int retriesLeft) {
+    this.retriesLeft = retriesLeft;
+  }
+
+  public SocketState getState() {
+    return state;
+  }
+
+  public void setState(SocketState state) {
+    this.state = state;
+  }
+
+  public int getQueueSize() {
+    return inQueue.size();
+  }
+
+  public MessageWrapper removeCurrentRequest() {
+    return inQueue.poll();
+  }
+
+  public boolean hasTimedOut() {
+    return (System.currentTimeMillis() - sendTime > RpcSocket.TIMEOUT);
+  }
+
+  public void send(MessageWrapper request) throws TimeoutException {
+    try {
+      boolean success = inQueue.offer(request, TIMEOUT, TimeUnit.MILLISECONDS);    
+      if (!success) {
+        throw new TimeoutException("send :: Queue is full");
+      }
+      process();
+    }
+    catch (InterruptedException e) {
+      log.error("send : Thread interrupted while attempting to add request to inQueue", e);
+    }
+  }
+  
+  public MessageWrapper receive() {
+    Message response = parseMessage();
+    MessageWrapper messageWrapper = inQueue.poll(); //remove the message from queue
+    MessageWrapper responseMessageWrapper = new MessageWrapper(response, messageWrapper.getReceiveSocket());
+
+    state = new IdleSocketState();
+    retriesLeft = NUM_RETRIES;
+    return responseMessageWrapper;
+  }
+  
+  public void process() {
+    if (getQueueSize() > 0) //process if there's message in the queue
+      state.process(this);
+  }
+
+  // Called by IdleSocketState & BusySocketState
+  public void sendMessage() {
+    //Get the message from queue without removing it. For retries
+    MessageWrapper messageWrapper = inQueue.peek();
+    if (messageWrapper != null) {
+      Message message = messageWrapper.getMessage();
+      try {
+        socket.send(Message.serialize(message));
+      }
+      catch (IOException e) {
+        log.debug("Message send failed [{}]", message);
+        log.debug("Exception [{}]", e);
+      }
+      sendTime = System.currentTimeMillis();
+    }
+  }
+  
+  public Message parseMessage() {
+    Message parsedMessage = null;
+    byte[] bytes = socket.recv();
+    log.debug("Received bytes:[{}]", bytes.length);
+    try {
+      parsedMessage = (Message)Message.deserialize(bytes);
+    }
+    catch (IOException|ClassNotFoundException e) {
+      log.debug("parseMessage : Deserializing received bytes failed", e);
+    }
+
+    return parsedMessage;
+  }
+
+  public void recycleSocket() {
+    close();
+  }
+
+  public void close() {
+    socket.setLinger(10);
+    socket.close();
+  }
+
+  private void createSocket() {
+    socket = Context.getInstance().getZmqContext().socket(ZMQ.REQ);
+    socket.connect(address);
+    poller.register(socket, ZMQ.Poller.POLLIN);
+    state = new IdleSocketState();
+  }
+
+
+  /**
+   * Represents the state of a {@link org.opendaylight.controller.sal.connector.remoterpc.RpcSocket}
+   */
+  public static interface SocketState {
+
+    /* The processing actions to be performed in this state
+     */
+    public void process(RpcSocket socket);
+  }
+
+  /**
+   * Represents the idle state of a {@link org.opendaylight.controller.sal.connector.remoterpc.RpcSocket}
+   */
+  public static class IdleSocketState implements SocketState {
+
+    @Override
+    public void process(RpcSocket socket) {
+      socket.sendMessage();
+      socket.setState(new BusySocketState());
+      socket.setRetriesLeft(socket.getRetriesLeft()-1);
+    }
+  }
+
+  /**
+   * Represents the busy state of a {@link org.opendaylight.controller.sal.connector.remoterpc.RpcSocket}
+   */
+  public static class BusySocketState implements SocketState {
+
+    private static Logger log = LoggerFactory.getLogger(BusySocketState.class);
+
+    @Override
+    public void process(RpcSocket socket) {
+      if (socket.hasTimedOut()) {
+        if (socket.getRetriesLeft() > 0) {
+          log.debug("process : Request timed out, retrying now...");
+          socket.sendMessage();
+          socket.setRetriesLeft(socket.getRetriesLeft() - 1);
+        }
+        else {
+          // No more retries for current request, so stop processing the current request
+          MessageWrapper message = socket.removeCurrentRequest();
+          if (message != null) {
+            log.error("Unable to process rpc request [{}]", message);
+            socket.setState(new IdleSocketState());
+            socket.setRetriesLeft(NUM_RETRIES);
+          }
+        }
+      }
+      // Else no timeout, so allow processing to continue
+    }
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Sender.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Sender.java
new file mode 100644 (file)
index 0000000..f53d5ad
--- /dev/null
@@ -0,0 +1,218 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.ZMQ;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+import static com.google.common.base.Preconditions.*;
+
+/**
+ * Main server thread for sending requests.
+ */
+public class Sender implements Runnable{
+
+  private final static Logger _logger = LoggerFactory.getLogger(Sender.class);
+  private final Client client;
+
+
+  
+  
+  public Sender(Client client) {
+    super();
+    this.client = client;
+  }
+
+@Override
+  public void run() {
+    _logger.info("Starting...");
+
+    try (SocketManager socketManager = new SocketManager()){
+      while (!Thread.currentThread().isInterrupted()) {
+
+        //read incoming messages from blocking queue
+        MessageWrapper request = pollForRequest();
+
+        if (request != null) {
+          processRequest(socketManager, request);
+        }
+
+        flushSockets(socketManager);
+        pollForResponse(socketManager);
+        processResponse(socketManager);
+
+      }
+    } catch(Exception t){
+      _logger.error("Exception: [{}]", t);
+      _logger.error("Stopping...");
+    }
+  }
+
+  private void processResponse(SocketManager socketManager) {
+    for (int i = 0; i < socketManager.getPoller().getSize(); i++) {
+      // If any sockets get a response, process it
+      if (socketManager.getPoller().pollin(i)) {
+        Optional<RpcSocket> socket = socketManager.getManagedSocketFor(
+            socketManager.getPoller().getItem(i).getSocket());
+
+        checkState(socket.isPresent(), "Managed socket not found");
+
+        MessageWrapper response = socket.get().receive();
+        _logger.debug("Received rpc response [{}]", response.getMessage());
+
+        //TODO: handle exception and introduce timeout on receiver side
+        try {
+          response.getReceiveSocket().send(Message.serialize(response.getMessage()));
+        } catch (IOException e) {
+          e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+        }
+      }
+    }
+  }
+
+  private void processRequest(SocketManager socketManager, MessageWrapper request) throws TimeoutException {
+
+    if ((request.getMessage() == null) ||
+        (request.getMessage().getRecipient() == null)) {
+      //invalid message. log and drop
+      _logger.error("Invalid request [{}]", request);
+      return;
+    }
+
+    RpcSocket socket =
+        socketManager.getManagedSocket(request.getMessage().getRecipient());
+
+    socket.send(request);
+  }
+
+  private void flushSockets(SocketManager socketManager){
+    for (RpcSocket socket : socketManager.getManagedSockets()){
+      socket.process();
+    }
+  }
+
+  private MessageWrapper pollForRequest(){
+    return client.getRequestQueue().poll();
+  }
+
+  private void pollForResponse(SocketManager socketManager){
+    try{
+      socketManager.getPoller().poll(10); //poll every 10ms
+    }catch (Throwable t) { /*Ignore and continue*/ }
+  }
+}
+
+
+/*
+SCALA
+
+package org.opendaylight.controller.sal.connector.remoterpc
+
+  import org.slf4j.{LoggerFactory, Logger}
+  import scala.collection.JavaConverters._
+  import scala.Some
+  import org.opendaylight.controller.sal.connector.remoterpc.dto.{MessageWrapper, Message}
+*/
+/**
+ * Main server thread for sending requests. This does not maintain any state. If the
+ * thread dies, it will be restarted
+ */
+/*class Sender extends Runnable {
+  private val _logger: Logger = LoggerFactory.getLogger(Sender.this.getClass())
+
+  override def run = {
+    _logger.info("Sender starting...")
+    val socketManager = new SocketManager()
+
+    try {
+      while (!Thread.currentThread().isInterrupted) {
+        //read incoming messages from blocking queue
+        val request: MessageWrapper = Client.requestQueue.poll()
+
+        if (request != null) {
+          if ((request.message != null) &&
+            (request.message.getRecipient != null)) {
+
+            val socket = socketManager.getManagedSocket(request.message.getRecipient)
+            socket.send(request)
+          } else {
+            //invalid message. log and drop
+            _logger.error("Invalid request [{}]", request)
+          }
+        }
+
+        socketManager.getManagedSockets().asScala.map(s => s.process)
+
+        // Poll all sockets for responses every 1 sec
+        poll(socketManager)
+
+        // If any sockets get a response, process it
+        for (i <- 0 until socketManager.poller.getSize) {
+          if (socketManager.poller.pollin(i)) {
+            val socket = socketManager.getManagedSocketFor(socketManager.poller.getItem(i).getSocket)
+
+            socket match {
+              case None => //{
+                _logger.error("Could not find a managed socket for zmq socket")
+                throw new IllegalStateException("Could not find a managed socket for zmq socket")
+                //}
+              case Some(s) => {
+                val response = s.receive()
+                _logger.debug("Received rpc response [{}]", response.message)
+                response.receiveSocket.send(Message.serialize(response.message))
+              }
+            }
+          }
+        }
+
+      }
+    } catch{
+      case e:Exception => {
+        _logger.debug("Sender stopping due to exception")
+        e.printStackTrace()
+      }
+    } finally {
+      socketManager.stop
+    }
+  }
+
+  def poll(socketManager:SocketManager) = {
+    try{
+      socketManager.poller.poll(10)
+    }catch{
+      case t:Throwable => //ignore and continue
+    }
+  }
+}
+
+
+//    def newThread(r: Runnable): Thread = {
+//      val t = new RequestHandler()
+//      t.setUncaughtExceptionHandler(new RequestProcessorExceptionHandler)
+//      t
+//    }
+
+
+
+/**
+ * Restarts the request processing server in the event of unforeseen exceptions
+ */
+//private class RequestProcessorExceptionHandler extends UncaughtExceptionHandler {
+//  def uncaughtException(t: Thread, e: Throwable) = {
+//    _logger.error("Exception caught during request processing [{}]", e)
+//    _logger.info("Restarting request processor server...")
+//    RequestProcessor.start()
+//  }
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerImpl.java
new file mode 100644 (file)
index 0000000..83b9385
--- /dev/null
@@ -0,0 +1,285 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import com.google.common.base.Optional;
+
+import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
+import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message.MessageType;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
+import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
+import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.ZMQ;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ * ZeroMq based implementation of RpcRouter TODO: 1. Make rpc request handling
+ * async and non-blocking. Note zmq socket is not thread safe 2. Read properties
+ * from config file using existing(?) ODL properties framework
+ */
+public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String, Set> {
+
+    private Logger _logger = LoggerFactory.getLogger(ServerImpl.class);
+
+    private ExecutorService serverPool;
+
+    // private RoutingTable<RpcRouter.RouteIdentifier, String> routingTable;
+    private RoutingTableProvider routingTable;
+    private Set<QName> remoteServices;
+    private ProviderSession brokerSession;
+    private ZMQ.Context context;
+    private ZMQ.Socket replySocket;
+
+    private final RpcListener listener = new RpcListener();
+
+    private final String localUri = Context.getInstance().getLocalUri();
+
+    private final int rpcPort;
+
+    private RpcImplementation client;
+
+    public RpcImplementation getClient() {
+        return client;
+    }
+
+    public void setClient(RpcImplementation client) {
+        this.client = client;
+    }
+
+    // Prevent instantiation
+    public ServerImpl(int rpcPort) {
+        this.rpcPort = rpcPort;
+    }
+
+    public void setBrokerSession(ProviderSession session) {
+        this.brokerSession = session;
+    }
+
+    public ExecutorService getServerPool() {
+        return serverPool;
+    }
+
+    public void setServerPool(ExecutorService serverPool) {
+        this.serverPool = serverPool;
+    }
+
+    public void start() {
+        context = ZMQ.context(1);
+        serverPool = Executors.newSingleThreadExecutor();
+        remoteServices = new HashSet<QName>();
+
+        // Start listening rpc requests
+        serverPool.execute(receive());
+
+        brokerSession.addRpcRegistrationListener(listener);
+        // routingTable.registerRouteChangeListener(routeChangeListener);
+
+        Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
+        for (QName rpc : currentlySupported) {
+            listener.onRpcImplementationAdded(rpc);
+        }
+
+        _logger.debug("RPC Server started [{}]", localUri);
+    }
+
+    public void stop() {
+        // TODO: un-subscribe
+
+        // if (context != null)
+        // context.term();
+        //
+        // _logger.debug("ZMQ Context is terminated.");
+
+        if (serverPool != null)
+            serverPool.shutdown();
+
+        _logger.debug("Thread pool is closed.");
+    }
+
+    private Runnable receive() {
+        return new Runnable() {
+            public void run() {
+
+                // Bind to RPC reply socket
+                replySocket = context.socket(ZMQ.REP);
+                replySocket.bind("tcp://*:" + Context.getInstance().getRpcPort());
+
+                // Poller enables listening on multiple sockets using a single
+                // thread
+                ZMQ.Poller poller = new ZMQ.Poller(1);
+                poller.register(replySocket, ZMQ.Poller.POLLIN);
+                try {
+                    // TODO: Add code to restart the thread after exception
+                    while (!Thread.currentThread().isInterrupted()) {
+
+                        poller.poll();
+
+                        if (poller.pollin(0)) {
+                            handleRpcCall();
+                        }
+                    }
+                } catch (Exception e) {
+                    // log and continue
+                    _logger.error("Unhandled exception [{}]", e);
+                } finally {
+                    poller.unregister(replySocket);
+                    replySocket.close();
+                }
+
+            }
+        };
+    }
+
+    /**
+     * @throws InterruptedException
+     * @throws ExecutionException
+     */
+    private void handleRpcCall() {
+
+        Message request = parseMessage(replySocket);
+
+        _logger.debug("Received rpc request [{}]", request);
+
+        // Call broker to process the message then reply
+        Future<RpcResult<CompositeNode>> rpc = null;
+        RpcResult<CompositeNode> result = null;
+        try {
+            rpc = brokerSession.rpc((QName) request.getRoute().getType(),
+                    XmlUtils.xmlToCompositeNode((String) request.getPayload()));
+
+            result = (rpc != null) ? rpc.get() : null;
+
+        } catch (Exception e) {
+            _logger.debug("Broker threw  [{}]", e);
+        }
+
+        CompositeNode payload = (result != null) ? result.getResult() : null;
+
+        Message response = new Message.MessageBuilder().type(MessageType.RESPONSE).sender(localUri)
+                .route(request.getRoute()).payload(XmlUtils.compositeNodeToXml(payload)).build();
+
+        _logger.debug("Sending rpc response [{}]", response);
+
+        try {
+            replySocket.send(Message.serialize(response));
+        } catch (Exception e) {
+            _logger.debug("rpc response send failed for message [{}]", response);
+            _logger.debug("{}", e);
+        }
+
+    }
+
+    /**
+     * @param socket
+     * @return
+     */
+    private Message parseMessage(ZMQ.Socket socket) {
+
+        Message msg = null;
+        try {
+            byte[] bytes = socket.recv();
+            _logger.debug("Received bytes:[{}]", bytes.length);
+            msg = (Message) Message.deserialize(bytes);
+        } catch (Throwable t) {
+            t.printStackTrace();
+        }
+        return msg;
+    }
+
+    @Override
+    public void onRouteUpdated(String key, Set values) {
+        RouteIdentifierImpl rId = new RouteIdentifierImpl();
+        try {
+            _logger.debug("Updating key/value {}-{}", key, values);
+            brokerSession.addRpcImplementation((QName) rId.fromString(key).getType(), client);
+
+        } catch (Exception e) {
+            _logger.info("Route update failed {}", e);
+        }
+    }
+
+    @Override
+    public void onRouteDeleted(String key) {
+        // TODO: Broker session needs to be updated to support this
+        throw new UnsupportedOperationException();
+    }
+    
+    /**
+     * Listener for rpc registrations
+     */
+    private class RpcListener implements RpcRegistrationListener {
+
+        
+
+        @Override
+        public void onRpcImplementationAdded(QName name) {
+
+            // if the service name exists in the set, this notice
+            // has bounced back from the broker. It should be ignored
+            if (remoteServices.contains(name))
+                return;
+
+            _logger.debug("Adding registration for [{}]", name);
+            RouteIdentifierImpl routeId = new RouteIdentifierImpl();
+            routeId.setType(name);
+
+            try {
+                routingTable.getRoutingTable().get().addGlobalRoute(routeId.toString(), localUri);
+                _logger.debug("Route added [{}-{}]", name, localUri);
+            } catch (RoutingTableException | SystemException e) {
+                // TODO: This can be thrown when route already exists in the
+                // table. Broker
+                // needs to handle this.
+                _logger.error("Unhandled exception while adding global route to routing table [{}]", e);
+
+            }
+        }
+
+        @Override
+        public void onRpcImplementationRemoved(QName name) {
+
+            _logger.debug("Removing registration for [{}]", name);
+            RouteIdentifierImpl routeId = new RouteIdentifierImpl();
+            routeId.setType(name);
+
+            try {
+                routingTable.getRoutingTable().get().removeGlobalRoute(routeId.toString());
+            } catch (RoutingTableException | SystemException e) {
+                _logger.error("Route delete failed {}", e);
+            }
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        stop();
+    }
+
+    public void setRoutingTableProvider(RoutingTableProvider provider) {
+        this.routingTable = provider;
+    }
+
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/SocketManager.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/SocketManager.java
new file mode 100644 (file)
index 0000000..588a299
--- /dev/null
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import com.google.common.base.Optional;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.ZMQ;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * Manages creation of {@link RpcSocket} and their registration with {@link ZMQ.Poller}
+ */
+public class SocketManager implements AutoCloseable{
+  private static final Logger log = LoggerFactory.getLogger(SocketManager.class);
+
+  /*
+   * RpcSockets mapped by network address its connected to
+   */
+  private ConcurrentHashMap<String, RpcSocket> managedSockets = new ConcurrentHashMap<String, RpcSocket>();
+
+  private ZMQ.Poller _poller = new ZMQ.Poller(2); //randomly selected size. Poller grows automatically
+
+  /**
+   * Returns a {@link RpcSocket} for the given address
+   * @param address network address with port eg: 10.199.199.20:5554
+   * @return
+   */
+  public RpcSocket getManagedSocket(String address) throws IllegalArgumentException {
+    //Precondition
+    if (!address.matches("(tcp://)(.*)(:)(\\d*)")) {
+      throw new IllegalArgumentException("Address must of format 'tcp://<ip address>:<port>' but is " + address);
+    }
+
+    if (!managedSockets.containsKey(address)) {
+      log.debug("{} Creating new socket for {}", Thread.currentThread().getName());
+      RpcSocket socket = new RpcSocket(address, _poller);
+      managedSockets.put(address, socket);
+    }
+
+    return managedSockets.get(address);
+  }
+
+  /**
+   * Returns a {@link RpcSocket} for the given {@link ZMQ.Socket}
+   * @param socket
+   * @return
+   */
+  public Optional<RpcSocket> getManagedSocketFor(ZMQ.Socket socket) {
+    for (RpcSocket rpcSocket : managedSockets.values()) {
+      if (rpcSocket.getSocket().equals(socket)) {
+        return Optional.of(rpcSocket);
+      }
+    }
+    return Optional.absent();
+  }
+
+  /**
+   * Return a collection of all managed sockets
+   * @return
+   */
+  public Collection<RpcSocket> getManagedSockets() {
+    return managedSockets.values();
+  }
+
+  /**
+   * Returns the {@link ZMQ.Poller}
+   * @return
+   */
+  public ZMQ.Poller getPoller() {
+    return _poller;
+  }
+
+  /**
+   * This should be called when stopping the server to close all the sockets
+   * @return
+   */
+  @Override
+  public void close() throws Exception {
+    log.debug("Stopping...");
+    for (RpcSocket socket : managedSockets.values()) {
+      socket.close();
+    }
+    managedSockets.clear();
+    log.debug("Stopped");
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/SocketPair.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/SocketPair.java
new file mode 100644 (file)
index 0000000..67b3a83
--- /dev/null
@@ -0,0 +1,41 @@
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import org.zeromq.ZMQ;
+
+import java.util.UUID;
+
+/**
+ *
+ */
+public class SocketPair implements AutoCloseable{
+  private ZMQ.Socket sender;
+  private ZMQ.Socket receiver;
+
+  private static final String INPROC_PREFIX = "inproc://";
+
+  public SocketPair(){
+    String address = new StringBuilder(INPROC_PREFIX)
+                         .append(UUID.randomUUID())
+                         .toString();
+
+    receiver = Context.getInstance().getZmqContext().socket(ZMQ.PAIR);
+    receiver.bind(address);
+
+    sender = Context.getInstance().getZmqContext().socket(ZMQ.PAIR);
+    sender.connect(address);
+  }
+
+  public ZMQ.Socket getSender(){
+    return this.sender;
+  }
+
+  public ZMQ.Socket getReceiver(){
+    return this.receiver;
+  }
+
+  @Override
+  public void close() throws Exception {
+    sender.close();
+    receiver.close();
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/CompositeNodeImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/CompositeNodeImpl.java
new file mode 100644 (file)
index 0000000..073601a
--- /dev/null
@@ -0,0 +1,153 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc.dto;
+
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.*;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class CompositeNodeImpl implements CompositeNode, Serializable {
+
+  private QName key;
+  private List<Node<?>> children;
+
+  @Override
+  public List<Node<?>> getChildren() {
+    return children;
+  }
+
+  @Override
+  public List<CompositeNode> getCompositesByName(QName children) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List<CompositeNode> getCompositesByName(String children) {
+    return null;  //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public List<SimpleNode<?>> getSimpleNodesByName(QName children) {
+    return null;  //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public List<SimpleNode<?>> getSimpleNodesByName(String children) {
+    return null;  //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public CompositeNode getFirstCompositeByName(QName container) {
+    return null;  //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public SimpleNode<?> getFirstSimpleByName(QName leaf) {
+    return null;  //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public MutableCompositeNode asMutable() {
+    return null;  //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public QName getKey() {
+    return key;  //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public List<Node<?>> setValue(List<Node<?>> value) {
+    return null;  //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public int size() {
+    return 0;  //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return false;  //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public boolean containsKey(Object key) {
+    return false;  //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public boolean containsValue(Object value) {
+    return false;  //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public List<Node<?>> get(Object key) {
+    return null;  //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public List<Node<?>> put(QName key, List<Node<?>> value) {
+    return null;  //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public List<Node<?>> remove(Object key) {
+    return null;  //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public void putAll(Map<? extends QName, ? extends List<Node<?>>> m) {
+    //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public void clear() {
+    //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public Set<QName> keySet() {
+    return null;  //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public Collection<List<Node<?>>> values() {
+    return null;  //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public Set<Entry<QName, List<Node<?>>>> entrySet() {
+    return null;  //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public QName getNodeType() {
+    return null;  //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public CompositeNode getParent() {
+    return null;  //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public List<Node<?>> getValue() {
+    return null;  //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public ModifyAction getModificationAction() {
+    return null;  //To change body of implemented methods use File | Settings | File Templates.
+  }
+}
@@ -6,10 +6,8 @@
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
 
-package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq;
+package org.opendaylight.controller.sal.connector.remoterpc.dto;
 
-
-import org.codehaus.jackson.map.ObjectMapper;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
 
 import java.io.*;
@@ -20,7 +18,8 @@ public class Message implements Serializable {
     ANNOUNCE((byte) 0),  //TODO: Remove announce, add rpc registration and deregistration
     HEARTBEAT((byte) 1),
     REQUEST((byte) 2),
-    RESPONSE((byte) 3);
+    RESPONSE((byte) 3),
+    ERROR((byte)4);
 
     private final byte type;
 
@@ -35,6 +34,7 @@ public class Message implements Serializable {
 
   private MessageType type;
   private String sender;
+  private String recipient;
   private RpcRouter.RouteIdentifier route;
   private Object payload;
 
@@ -70,11 +70,19 @@ public class Message implements Serializable {
     this.payload = payload;
   }
 
+  public String getRecipient() {
+    return recipient;
+  }
+
+  public void setRecipient(String recipient) {
+    this.recipient = recipient;
+  }
   @Override
   public String toString() {
     return "Message{" +
         "type=" + type +
         ", sender='" + sender + '\'' +
+        ", recipient='" + recipient + '\'' +
         ", route=" + route +
         ", payload=" + payload +
         '}';
@@ -108,17 +116,6 @@ public class Message implements Serializable {
     return o.readObject();
   }
 
-  public static byte[] toJsonBytes(Message m) throws IOException {
-    ObjectMapper o = new ObjectMapper();
-    return o.writeValueAsBytes(m);
-  }
-
-  public static Message fromJsonBytes(byte [] bytes) throws IOException {
-
-    ObjectMapper o = new ObjectMapper();
-    return o.readValue(bytes, Message.class);
-  }
-
   public static class Response extends Message implements RpcRouter.RpcReply {
     private ResponseCode code; // response code
 
@@ -163,6 +160,11 @@ public class Message implements Serializable {
       return this;
     }
 
+    public MessageBuilder recipient(String recipient){
+      message.setRecipient(recipient);
+      return this;
+    }
+
     public MessageBuilder route(RpcRouter.RouteIdentifier route){
       message.setRoute(route);
       return this;
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/MessageWrapper.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/MessageWrapper.java
new file mode 100644 (file)
index 0000000..8d2198c
--- /dev/null
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc.dto;
+
+import org.zeromq.ZMQ;
+
+/**
+ * A class encapsulating {@link Message} and the {@link ZMQ.Socket} over which it is transmitted
+ */
+public class MessageWrapper {
+
+  private Message _message;
+  private ZMQ.Socket _receiveSocket;
+  
+  public MessageWrapper(Message message, ZMQ.Socket receiveSocket) {
+    this._message = message;
+    this._receiveSocket = receiveSocket;
+  }
+
+  public Message getMessage() {
+    return _message;
+  }
+
+  public ZMQ.Socket getReceiveSocket() {
+    return _receiveSocket;
+  }
+}
@@ -5,19 +5,21 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq;
+package org.opendaylight.controller.sal.connector.remoterpc.dto;
 
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 
 import java.io.Serializable;
+import java.net.URI;
 
-/**
- * User: abhishk2
- */
 public class RouteIdentifierImpl implements RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>,Serializable {
 
+  transient ObjectMapper mapper = new ObjectMapper();
+
   private QName context;
   private QName type;
   private InstanceIdentifier route;
@@ -51,10 +53,35 @@ public class RouteIdentifierImpl implements RpcRouter.RouteIdentifier<QName, QNa
 
   @Override
   public String toString() {
-    return "RouteIdentifierImpl{" +
-        "context=" + context +
-        ", type=" + type +
-        ", route=" + route +
-        '}';
+    try {
+      return mapper.writeValueAsString(this);
+    } catch (Throwable e) {
+      //do nothing
+    }
+
+    return super.toString();
+  }
+
+  public RpcRouter.RouteIdentifier fromString(String input)
+      throws Exception {
+
+    JsonNode root = mapper.readTree(input);
+    this.context  = parseQName(root.get("context"));
+    this.type     = parseQName(root.get("type"));
+
+    return this;
+  }
+
+  private QName parseQName(JsonNode node){
+    if (node == null) return null;
+
+    String namespace = (node.get("namespace") != null) ?
+                       node.get("namespace").asText()  : "";
+
+    String localName = (node.get("localName") != null) ?
+                       node.get("localName").asText() : "";
+
+    URI uri = URI.create(namespace);
+    return new QName(uri, localName);
   }
 }
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/util/XmlUtils.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/util/XmlUtils.java
new file mode 100644 (file)
index 0000000..7dab2e3
--- /dev/null
@@ -0,0 +1,63 @@
+package org.opendaylight.controller.sal.connector.remoterpc.util;
+
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.Node;
+import org.opendaylight.yangtools.yang.data.api.SimpleNode;
+import org.opendaylight.yangtools.yang.data.impl.NodeUtils;
+import org.opendaylight.yangtools.yang.data.impl.XmlTreeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+
+import javax.xml.stream.XMLStreamException;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+import java.io.ByteArrayInputStream;
+import java.io.StringWriter;
+
+public class XmlUtils {
+
+  private static final Logger _logger = LoggerFactory.getLogger(XmlUtils.class);
+
+  public static String compositeNodeToXml(CompositeNode cNode){
+    if (cNode == null) return new String();
+
+    Document domTree = NodeUtils.buildShadowDomTree(cNode);
+    StringWriter writer = new StringWriter();
+    try {
+      TransformerFactory tf = TransformerFactory.newInstance();
+      Transformer transformer = tf.newTransformer();
+      transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
+      transformer.transform(new DOMSource(domTree), new StreamResult(writer));
+    } catch (TransformerException e) {
+      _logger.error("Error during translation of Document to OutputStream", e);
+    }
+
+    return writer.toString();
+  }
+
+  public static CompositeNode xmlToCompositeNode(String xml){
+    if (xml==null || xml.length()==0) return null;
+
+    Node<?> dataTree;
+    try {
+      dataTree = XmlTreeBuilder.buildDataTree(new ByteArrayInputStream(xml.getBytes()));
+    } catch (XMLStreamException e) {
+      _logger.error("Error during building data tree from XML", e);
+      return null;
+    }
+    if (dataTree == null) {
+      _logger.error("data tree is null");
+      return null;
+    }
+    if (dataTree instanceof SimpleNode) {
+      _logger.error("RPC XML was resolved as SimpleNode");
+      return null;
+    }
+    return (CompositeNode) dataTree;
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/scala/org/opendaylight/controller/sal/connector/remoterpc/Client.scala b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/scala/org/opendaylight/controller/sal/connector/remoterpc/Client.scala
new file mode 100644 (file)
index 0000000..63b6808
--- /dev/null
@@ -0,0 +1,154 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc
+
+import org.opendaylight.yangtools.yang.data.api.CompositeNode
+import org.opendaylight.yangtools.yang.common.{RpcError, RpcResult, QName}
+import org.opendaylight.controller.sal.core.api.RpcImplementation
+import java.util
+import java.util.{UUID, Collections}
+import org.zeromq.ZMQ
+import org.opendaylight.controller.sal.common.util.{RpcErrors, Rpcs}
+import org.slf4j.LoggerFactory
+import org.opendaylight.controller.sal.connector.remoterpc.dto.{MessageWrapper, RouteIdentifierImpl, Message}
+import Message.MessageType
+import java.util.concurrent._
+import java.lang.InterruptedException
+
+
+/**
+ * An implementation of {@link RpcImplementation} that makes
+ * remote RPC calls
+ */
+class Client extends RemoteRpcClient {
+
+  private val _logger = LoggerFactory.getLogger(this.getClass);
+
+  val requestQueue = new LinkedBlockingQueue[MessageWrapper](100)
+  val pool: ExecutorService = Executors.newSingleThreadExecutor()
+  private val TIMEOUT = 5000 //in ms
+  var routingTableProvider: RoutingTableProvider = null
+  
+  
+  def getInstance = this
+
+  
+  def setRoutingTableProvider(provider : RoutingTableProvider) = {
+    routingTableProvider = provider;
+  }
+  
+  def getSupportedRpcs: util.Set[QName] = {
+    Collections.emptySet()
+  }
+
+  def invokeRpc(rpc: QName, input: CompositeNode): RpcResult[CompositeNode] = {
+
+    val routeId = new RouteIdentifierImpl()
+    routeId.setType(rpc)
+
+    //lookup address for the rpc request
+    val routingTable = routingTableProvider.getRoutingTable()
+    require( routingTable != null, "Routing table not found. Exiting" )
+
+    val addresses:util.Set[String] = routingTable.getRoutes(routeId.toString)
+    require(addresses != null, "Address not found for rpc " + rpc);
+    require(addresses.size() == 1) //its a global service.
+
+    val address = addresses.iterator().next()
+    require(address != null, "Address is null")
+
+    //create in-process "pair" socket and pass it to sender thread
+    //Sender replies on this when result is available
+    val inProcAddress = "inproc://" + UUID.randomUUID()
+    val receiver = Context.zmqContext.socket(ZMQ.PAIR)
+    receiver.bind(inProcAddress);
+
+    val sender = Context.zmqContext.socket(ZMQ.PAIR)
+    sender.connect(inProcAddress)
+
+    val requestMessage = new Message.MessageBuilder()
+      .`type`(MessageType.REQUEST)
+      //.sender("tcp://localhost:8081")
+      .recipient(address)
+      .route(routeId)
+      .payload(input)
+      .build()
+
+    _logger.debug("Queuing up request and expecting response on [{}]", inProcAddress)
+
+    val messageWrapper = new MessageWrapper(requestMessage, sender)
+    val errors = new util.ArrayList[RpcError]
+
+    try {
+      process(messageWrapper)
+      val response = parseMessage(receiver)
+
+      return Rpcs.getRpcResult(
+        true, response.getPayload.asInstanceOf[CompositeNode], Collections.emptySet())
+
+    } catch {
+      case e: Exception => {
+        errors.add(RpcErrors.getRpcError(null,null,null,null,e.getMessage,null,e.getCause))
+        return Rpcs.getRpcResult(false, null, errors)
+      }
+    } finally {
+      receiver.close();
+      sender.close();
+    }
+
+  }
+
+  /**
+   * Block on socket for reply
+   * @param receiver
+   * @return
+   */
+  private def parseMessage(receiver:ZMQ.Socket): Message = {
+    val bytes = receiver.recv()
+    return  Message.deserialize(bytes).asInstanceOf[Message]
+  }
+
+  def start() = {
+    pool.execute(new Sender)
+  }
+
+  def process(msg: MessageWrapper) = {
+    _logger.debug("Processing message [{}]", msg)
+    val success = requestQueue.offer(msg, TIMEOUT, TimeUnit.MILLISECONDS)
+
+    if (!success) throw new TimeoutException("Queue is full");
+
+  }
+
+  def stop() = {
+    pool.shutdown() //intiate shutdown
+    _logger.debug("Client stopping...")
+    //    Context.zmqContext.term();
+    //    _logger.debug("ZMQ context terminated")
+
+    try {
+
+      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+        pool.shutdownNow();
+        if (!pool.awaitTermination(10, TimeUnit.SECONDS))
+          _logger.error("Client thread pool did not shut down");
+      }
+    } catch {
+      case ie:InterruptedException =>
+        // (Re-)Cancel if current thread also interrupted
+        pool.shutdownNow();
+        // Preserve interrupt status
+        Thread.currentThread().interrupt();
+    }
+    _logger.debug("Client stopped")
+  }
+  
+  def close() = {
+    stop();
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/yang/odl-sal-dom-rpc-remote-cfg.yang b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/yang/odl-sal-dom-rpc-remote-cfg.yang
new file mode 100644 (file)
index 0000000..d20efe5
--- /dev/null
@@ -0,0 +1,52 @@
+module odl-sal-dom-rpc-remote-cfg {
+       yang-version 1;
+    namespace "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote:rpc";
+    prefix "rpc-cluster";
+
+    import config { prefix config; revision-date 2013-04-05; }
+    import opendaylight-md-sal-dom {prefix dom;}
+    
+    description
+        "Service definition for Binding Aware MD-SAL.";
+    revision "2013-10-28" {
+        description
+            "Initial revision";
+    }
+
+    identity remote-rpc-server {
+        base config:service-type;
+        config:java-class "org.opendaylight.controller.sal.connector.remoterpc.RemoteRpcServer";
+    }
+
+    identity remote-rpc-client {
+        base config:service-type;
+        config:java-class "org.opendaylight.controller.sal.connector.remoterpc.RemoteRpcClient";
+    }
+
+    identity remote-zeromq-rpc-server {
+        base config:module-type;
+        config:provided-service remote-rpc-server;
+        config:provided-service remote-rpc-client;
+        config:java-name-prefix ZeroMQServer;
+    }
+
+    augment "/config:modules/config:module/config:configuration" {
+        case remote-zeromq-rpc-server {
+            when "/config:modules/config:module/config:type = 'remote-zeromq-rpc-server'";
+            
+            container dom-broker {
+                uses config:service-ref {
+                    refine type {
+                        mandatory true;
+                        config:required-identity dom:dom-broker-osgi-registry;
+                    }
+                }
+            }
+            
+            leaf port {
+                type uint16;
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ClientTest.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ClientTest.java
new file mode 100644 (file)
index 0000000..2e77537
--- /dev/null
@@ -0,0 +1,56 @@
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import junit.framework.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+
+import java.util.concurrent.TimeoutException;
+
+public class ClientTest {
+
+    Client client;
+    
+  @Before
+  public void setup(){
+      client = new Client();
+      client.getRequestQueue().clear();
+  }
+
+  @Test
+  public void testStop() throws Exception {
+
+  }
+
+  @Test
+  public void testPool() throws Exception {
+
+  }
+
+  @Test
+  public void process_AddAMessage_ShouldAddToQueue() throws Exception {
+    client.process(getEmptyMessageWrapper());
+    Assert.assertEquals(1, client.getRequestQueue().size());
+  }
+
+  /**
+   * Queue size is 100. Adding 101 message should time out in 2 sec
+   * if server does not process it
+   * @throws Exception
+   */
+  @Test(expected = TimeoutException.class)
+  public void process_Add101Message_ShouldThrow() throws Exception {
+    for (int i=0;i<101;i++){
+      client.process(getEmptyMessageWrapper());
+    }
+  }
+
+  @Test
+  public void testStart() throws Exception {
+  }
+
+  private MessageWrapper getEmptyMessageWrapper(){
+    return new MessageWrapper(new Message(), null);
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/RouteIdentifierImplTest.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/RouteIdentifierImplTest.java
new file mode 100644 (file)
index 0000000..550d9ef
--- /dev/null
@@ -0,0 +1,50 @@
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import org.codehaus.jackson.JsonParseException;
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+
+public class RouteIdentifierImplTest {
+
+  Logger _logger = LoggerFactory.getLogger(RouteIdentifierImplTest.class);
+
+  private final URI namespace = URI.create("http://cisco.com/example");
+  private final QName QNAME = new QName(namespace, "heartbeat");
+
+  @Test
+  public void testToString() throws Exception {
+    RouteIdentifierImpl rId = new RouteIdentifierImpl();
+    rId.setType(QNAME);
+
+    _logger.debug(rId.toString());
+
+    Assert.assertTrue(true);
+
+  }
+
+  @Test
+  public void testFromString() throws Exception {
+    RouteIdentifierImpl rId = new RouteIdentifierImpl();
+    rId.setType(QNAME);
+
+    _logger.debug("route: " + rId.fromString(rId.toString()));
+
+    Assert.assertTrue(true);
+  }
+
+  @Test(expected = JsonParseException.class)
+  public void testFromInvalidString() throws Exception {
+    String invalidInput = "aklhdgadfa;;;;;;;]]]]=]ag" ;
+    RouteIdentifierImpl rId = new RouteIdentifierImpl();
+    rId.fromString(invalidInput);
+
+    _logger.debug("" + rId);
+    Assert.assertTrue(true);
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/RpcSocketTest.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/RpcSocketTest.java
new file mode 100644 (file)
index 0000000..e23a3ca
--- /dev/null
@@ -0,0 +1,196 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import junit.framework.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.zeromq.ZMQ;
+
+import java.util.concurrent.TimeoutException;
+
+import static org.mockito.Mockito.doNothing;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(RpcSocket.class)
+public class RpcSocketTest {
+  RpcSocket rpcSocket = new RpcSocket("tcp://localhost:5554", new ZMQ.Poller(1));
+  RpcSocket spy = PowerMockito.spy(rpcSocket);
+
+  @Test
+  public void testCreateSocket() throws Exception {
+    Assert.assertEquals("tcp://localhost:5554", spy.getAddress());
+    Assert.assertEquals(ZMQ.REQ, spy.getSocket().getType());
+  }
+
+  @Test(expected = TimeoutException.class)
+  public void send_WhenQueueGetsFull_ShouldThrow() throws Exception {
+
+    doNothing().when(spy).process();
+
+    //10 is queue size
+    for (int i=0;i<10;i++){
+      spy.send(getEmptyMessageWrapper());
+    }
+
+    //sending 11th message should throw
+    spy.send(getEmptyMessageWrapper());
+  }
+
+  @Test
+  public void testHasTimedOut() throws Exception {
+    spy.send(getEmptyMessageWrapper());
+    Assert.assertFalse(spy.hasTimedOut());
+    Thread.sleep(1000);
+    Assert.assertFalse(spy.hasTimedOut());
+    Thread.sleep(1000);
+    Assert.assertTrue(spy.hasTimedOut());
+  }
+
+  @Test
+  public void testProcess() throws Exception {
+    PowerMockito.doNothing().when(spy, "sendMessage");
+    spy.send(getEmptyMessageWrapper());
+
+    //Next message should get queued
+    spy.send(getEmptyMessageWrapper());
+
+    //queue size should be 2
+    Assert.assertEquals(2, spy.getQueueSize());
+
+
+    spy.process();
+    //sleep for 2 secs (timeout)
+    //message send would be retried
+    Thread.sleep(2000);
+    spy.process();
+    Thread.sleep(2000);
+    spy.process();
+    Thread.sleep(2000);
+    spy.process(); //retry fails, next message will get picked up
+    Assert.assertEquals(1, spy.getQueueSize());
+  }
+
+  @Test
+  public void testProcessStateTransitions() throws Exception {
+    PowerMockito.doNothing().when(spy, "sendMessage");
+    Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState);
+    spy.send(getEmptyMessageWrapper());
+    Assert.assertEquals(1, spy.getQueueSize());
+    Thread.sleep(200);
+    Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState);
+    Thread.sleep(1800);
+
+    //1st timeout, 2nd try
+    spy.process();
+    Thread.sleep(200);
+    Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState);
+    Thread.sleep(1800);
+
+    //2nd timeout, 3rd try
+    spy.process();
+    Thread.sleep(200);
+    Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState);
+    Thread.sleep(1800);
+
+    //3rd timeout, no more tries => remove
+    spy.process();
+    Thread.sleep(200);
+    Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState);
+    Assert.assertEquals(0, spy.getQueueSize());
+  }
+
+  @Test
+  public void testParseMessage() throws Exception {
+    // Write an integration test for parseMessage
+  }
+
+  @Test
+  public void testRecycleSocket() throws Exception {
+    // This will need to be updated in the future...for now, recycleSocket() calls close()
+    Assert.assertTrue(spy.getSocket().base().check_tag());
+    spy.close();
+    Assert.assertEquals(10, spy.getSocket().getLinger());
+    Assert.assertFalse(spy.getSocket().base().check_tag());
+  }
+
+  @Test
+  public void testClose() throws Exception {
+    Assert.assertTrue(spy.getSocket().base().check_tag());
+    spy.close();
+    Assert.assertEquals(10, spy.getSocket().getLinger());
+    Assert.assertFalse(spy.getSocket().base().check_tag());
+  }
+
+  @Test
+  public void testReceive() throws Exception {
+    PowerMockito.doReturn(null).when(spy, "parseMessage");
+    PowerMockito.doNothing().when(spy, "process");
+    spy.send(getEmptyMessageWrapper());
+
+    //There should be 1 message waiting in the queue
+    Assert.assertEquals(1, spy.getQueueSize());
+
+    spy.receive();
+    //This should complete message processing
+    //The message should be removed from the queue
+    Assert.assertEquals(0, spy.getQueueSize());
+    Assert.assertEquals(RpcSocket.NUM_RETRIES, spy.getRetriesLeft());
+
+  }
+
+  @Test
+  public void testReceiveStateTransitions() throws Exception {
+    PowerMockito.doReturn(null).when(spy, "parseMessage");
+    Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState);
+    spy.send(getEmptyMessageWrapper());
+
+    //There should be 1 message waiting in the queue
+    Assert.assertEquals(1, spy.getQueueSize());
+    Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState);
+
+    spy.receive();
+    //This should complete message processing
+    //The message should be removed from the queue
+    Assert.assertEquals(0, spy.getQueueSize());
+    Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState);
+  }
+
+  private MessageWrapper getEmptyMessageWrapper(){
+    return new MessageWrapper(new Message(), null);
+  }
+
+  @Test
+  public void testProcessReceiveSequence() throws Exception {
+    PowerMockito.doNothing().when(spy, "sendMessage");
+    PowerMockito.doReturn(null).when(spy, "parseMessage");
+    Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState);
+    spy.send(getEmptyMessageWrapper());
+    spy.send(getEmptyMessageWrapper());
+    Assert.assertEquals(2, spy.getQueueSize());
+    Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState);
+
+
+    Thread.sleep(2000);
+    Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState);
+    spy.receive();
+    Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState);
+    Assert.assertEquals(1, spy.getQueueSize());
+
+    spy.process();
+    Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState);
+    spy.receive();
+    Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState);
+    Assert.assertEquals(0, spy.getQueueSize());
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/SerilizationTest.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/SerilizationTest.java
new file mode 100644 (file)
index 0000000..36a4acd
--- /dev/null
@@ -0,0 +1,79 @@
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import org.junit.Test;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.Node;
+import org.opendaylight.yangtools.yang.data.api.SimpleNode;
+import org.opendaylight.yangtools.yang.data.impl.NodeUtils;
+import org.opendaylight.yangtools.yang.data.impl.XmlTreeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+
+import javax.xml.stream.XMLStreamException;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.io.StringWriter;
+
+public class SerilizationTest {
+
+  private static final Logger _logger = LoggerFactory.getLogger(SerilizationTest.class);
+
+  public void fromXml() {
+  }
+
+  @Test
+  public void toXml() throws FileNotFoundException {
+
+    InputStream xmlStream = SerilizationTest.class.getResourceAsStream("/FourSimpleChildren.xml");
+    StringWriter writer = new StringWriter();
+
+    CompositeNode data = loadCompositeNode(xmlStream);
+    Document domTree = NodeUtils.buildShadowDomTree(data);
+    try {
+      TransformerFactory tf = TransformerFactory.newInstance();
+      Transformer transformer = tf.newTransformer();
+      transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
+      //transformer.setOutputProperty(OutputKeys.METHOD, "xml");
+      //transformer.setOutputProperty(OutputKeys.INDENT, "yes");
+      //transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
+      //transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
+      transformer.transform(new DOMSource(domTree), new StreamResult(writer));
+    } catch (TransformerException e) {
+      _logger.error("Error during translation of Document to OutputStream", e);
+    }
+
+    _logger.info("Parsed xml [{}]", writer.toString());
+  }
+
+  //Note to self:  Stolen from TestUtils
+  ///Users/alefan/odl/controller4/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/TestUtils.java
+  // Figure out how to include TestUtils through pom ...was getting errors
+  private CompositeNode loadCompositeNode(InputStream xmlInputStream) throws FileNotFoundException {
+    if (xmlInputStream == null) {
+      throw new IllegalArgumentException();
+    }
+    Node<?> dataTree;
+    try {
+      dataTree = XmlTreeBuilder.buildDataTree(xmlInputStream);
+    } catch (XMLStreamException e) {
+      _logger.error("Error during building data tree from XML", e);
+      return null;
+    }
+    if (dataTree == null) {
+      _logger.error("data tree is null");
+      return null;
+    }
+    if (dataTree instanceof SimpleNode) {
+      _logger.error("RPC XML was resolved as SimpleNode");
+      return null;
+    }
+    return (CompositeNode) dataTree;
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/SocketManagerTest.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/SocketManagerTest.java
new file mode 100644 (file)
index 0000000..130b30d
--- /dev/null
@@ -0,0 +1,118 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import com.google.common.base.Optional;
+import junit.framework.Assert;
+import org.junit.After;
+import org.junit.Before;
+import org.zeromq.ZMQ;
+import org.opendaylight.controller.sal.connector.remoterpc.SocketManager;
+import org.opendaylight.controller.sal.connector.remoterpc.RpcSocket;
+import org.opendaylight.controller.sal.connector.remoterpc.Context;
+import org.junit.Test;
+
+public class SocketManagerTest {
+
+  SocketManager manager;
+
+  @Before
+  public void setup(){
+    manager = new SocketManager();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    manager.close();
+  }
+
+  @Test
+  public void getManagedSockets_When2NewAdded_ShouldContain2() throws Exception {
+
+    //Prepare data
+    manager.getManagedSocket("tcp://localhost:5554");
+    manager.getManagedSocket("tcp://localhost:5555");
+
+    Assert.assertTrue( 2 == manager.getManagedSockets().size());
+  }
+
+  @Test
+  public void getManagedSockets_When2NewAddedAnd1Existing_ShouldContain2() throws Exception {
+
+    //Prepare data
+    manager.getManagedSocket("tcp://localhost:5554");
+    manager.getManagedSocket("tcp://localhost:5555");
+    manager.getManagedSocket("tcp://localhost:5554"); //ask for the first one
+
+    Assert.assertTrue( 2 == manager.getManagedSockets().size());
+  }
+
+  @Test
+  public void getManagedSocket_WhenPassedAValidAddress_ShouldReturnARpcSocket() throws Exception {
+    String testAddress = "tcp://localhost:5554";
+    RpcSocket rpcSocket = manager.getManagedSocket(testAddress);
+    Assert.assertEquals(testAddress, rpcSocket.getAddress());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void getManagedSocket_WhenPassedInvalidHostAddress_ShouldThrow() throws Exception {
+    String testAddress = "tcp://nonexistenthost:5554";
+    RpcSocket rpcSocket = manager.getManagedSocket(testAddress);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void getManagedSocket_WhenPassedInvalidAddress_ShouldThrow() throws Exception {
+    String testAddress = "xxx";
+    RpcSocket rpcSocket = manager.getManagedSocket(testAddress);
+  }
+
+  @Test
+  public void getManagedSocket_WhenPassedAValidZmqSocket_ShouldReturnARpcSocket() throws Exception {
+    //Prepare data
+    String firstAddress = "tcp://localhost:5554";
+    RpcSocket firstRpcSocket = manager.getManagedSocket(firstAddress);
+    ZMQ.Socket firstZmqSocket = firstRpcSocket.getSocket();
+
+    String secondAddress = "tcp://localhost:5555";
+    RpcSocket secondRpcSocket = manager.getManagedSocket(secondAddress);
+    ZMQ.Socket secondZmqSocket = secondRpcSocket.getSocket();
+
+    Assert.assertEquals(firstRpcSocket, manager.getManagedSocketFor(firstZmqSocket).get());
+    Assert.assertEquals(secondRpcSocket, manager.getManagedSocketFor(secondZmqSocket).get());
+  }
+
+  @Test
+  public void getManagedSocket_WhenPassedNonManagedZmqSocket_ShouldReturnNone() throws Exception {
+    ZMQ.Socket nonManagedSocket = Context.getInstance().getZmqContext().socket(ZMQ.REQ);
+    nonManagedSocket.connect("tcp://localhost:5000");
+
+    //Prepare data
+    String firstAddress = "tcp://localhost:5554";
+    RpcSocket firstRpcSocket = manager.getManagedSocket(firstAddress);
+    ZMQ.Socket firstZmqSocket = firstRpcSocket.getSocket();
+
+    Assert.assertSame(Optional.<RpcSocket>absent(), manager.getManagedSocketFor(nonManagedSocket) );
+    Assert.assertSame(Optional.<RpcSocket>absent(), manager.getManagedSocketFor(null) );
+  }
+
+  @Test
+  public void stop_WhenCalled_ShouldEmptyManagedSockets() throws Exception {
+    manager.getManagedSocket("tcp://localhost:5554");
+    manager.getManagedSocket("tcp://localhost:5555");
+    Assert.assertTrue( 2 == manager.getManagedSockets().size());
+
+    manager.close();
+    Assert.assertTrue( 0 == manager.getManagedSockets().size());
+  }
+
+  @Test
+  public void poller_WhenCalled_ShouldReturnAnInstanceOfPoller() throws Exception {
+    Assert.assertTrue (manager.getPoller() instanceof ZMQ.Poller);
+  }
+
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/resources/FourSimpleChildren.xml b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/resources/FourSimpleChildren.xml
new file mode 100644 (file)
index 0000000..5ac991b
--- /dev/null
@@ -0,0 +1,6 @@
+<rpc>
+    <name>eth0</name>
+    <type>ethernetCsmacd</type>
+    <enabled>false</enabled>
+    <description>some interface</description>
+</rpc>
@@ -2,11 +2,11 @@
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
     <parent>
-        <artifactId>sal-test-parent</artifactId>
+        <artifactId>sal-remoterpc-connector-test-parent</artifactId>
         <groupId>org.opendaylight.controller.tests</groupId>
         <version>1.0-SNAPSHOT</version>
     </parent>
-    <artifactId>zeromq-test-consumer</artifactId>
+    <artifactId>sal-remoterpc-connector-test-consumer</artifactId>
     <packaging>bundle</packaging>
     <scm>
         <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
                 <configuration>
                     <instructions>
                         <Bundle-Activator>org.opendaylight.controller.sample.zeromq.consumer.ExampleConsumer</Bundle-Activator>
-                        <Import-Package>
-                            org.opendaylight.controller.sal.core.api,
-                            org.opendaylight.yangtools.yang.common;version="[0.5,1)",
-                            org.opendaylight.yangtools.yang.data.api,
-                        </Import-Package>
                     </instructions>
                 </configuration>
             </plugin>
             <groupId>org.opendaylight.yangtools</groupId>
             <artifactId>yang-data-api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.opendaylight.yangtools</groupId>
+            <artifactId>yang-data-impl</artifactId>
+            <version>0.5.9-SNAPSHOT</version>
+        </dependency>
 
         <dependency>
             <groupId>org.opendaylight.controller</groupId>
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/java/org/opendaylight/controller/sample/zeromq/consumer/ExampleConsumer.java b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/java/org/opendaylight/controller/sample/zeromq/consumer/ExampleConsumer.java
new file mode 100644 (file)
index 0000000..87078ea
--- /dev/null
@@ -0,0 +1,122 @@
+package org.opendaylight.controller.sample.zeromq.consumer;
+
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Hashtable;
+import java.util.concurrent.*;
+
+import org.opendaylight.controller.sal.core.api.AbstractConsumer;
+import org.opendaylight.controller.sal.core.api.Broker.ConsumerSession;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.Node;
+import org.opendaylight.yangtools.yang.data.api.SimpleNode;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.opendaylight.yangtools.yang.data.impl.XmlTreeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
+
+import javax.xml.stream.XMLStreamException;
+
+public class ExampleConsumer extends AbstractConsumer {
+
+  private final URI namespace = URI.create("http://cisco.com/example");
+  private final QName QNAME = new QName(namespace, "heartbeat");
+
+  private ConsumerSession session;
+
+  private ServiceRegistration<ExampleConsumer> thisReg;
+  private Logger _logger = LoggerFactory.getLogger(ExampleConsumer.class);
+
+  @Override
+  public void onSessionInitiated(ConsumerSession session) {
+    this.session = session;
+  }
+
+  public RpcResult<CompositeNode> invokeRpc(QName qname, CompositeNode input) {
+    _logger.info("Invoking RPC:[{}] with Input:[{}]", qname.getLocalName(), input);
+    RpcResult<CompositeNode> result = null;
+    Future<RpcResult<CompositeNode>> future = ExampleConsumer.this.session.rpc(qname, input);
+    try {
+      result = future.get();
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    _logger.info("Returning Result:[{}]", result);
+    return result;
+  }
+
+  @Override
+  protected void startImpl(BundleContext context){
+    thisReg = context.registerService(ExampleConsumer.class, this, new Hashtable<String,String>());
+  }
+  @Override
+  protected void stopImpl(BundleContext context) {
+    super.stopImpl(context);
+    thisReg.unregister();
+  }
+
+  public CompositeNode getValidCompositeNodeWithOneSimpleChild() throws FileNotFoundException {
+    InputStream xmlStream = ExampleConsumer.class.getResourceAsStream("/OneSimpleChild.xml");
+    return loadCompositeNode(xmlStream);
+  }
+
+  public CompositeNode getValidCompositeNodeWithTwoSimpleChildren() throws FileNotFoundException {
+    InputStream xmlStream = ExampleConsumer.class.getResourceAsStream("/TwoSimpleChildren.xml");
+    return loadCompositeNode(xmlStream);
+  }
+
+  public CompositeNode getValidCompositeNodeWithFourSimpleChildren() throws FileNotFoundException {
+    InputStream xmlStream = ExampleConsumer.class.getResourceAsStream("/FourSimpleChildren.xml");
+    return loadCompositeNode(xmlStream);
+  }
+
+  public CompositeNode getValidCompositeNodeWithOneSimpleOneCompositeChild() throws FileNotFoundException {
+    InputStream xmlStream = ExampleConsumer.class.getResourceAsStream("/OneSimpleOneCompositeChild.xml");
+    return loadCompositeNode(xmlStream);
+  }
+
+  public CompositeNode getValidCompositeNodeWithTwoCompositeChildren() throws FileNotFoundException {
+    InputStream xmlStream = ExampleConsumer.class.getResourceAsStream("/TwoCompositeChildren.xml");
+    return loadCompositeNode(xmlStream);
+  }
+
+  public CompositeNode getInvalidCompositeNodeSimpleChild() throws FileNotFoundException {
+    InputStream xmlStream = ExampleConsumer.class.getResourceAsStream("/InvalidSimpleChild.xml");
+    return loadCompositeNode(xmlStream);
+  }
+
+  public CompositeNode getInvalidCompositeNodeCompositeChild() throws FileNotFoundException {
+    InputStream xmlStream = ExampleConsumer.class.getResourceAsStream("/InvalidCompositeChild.xml");
+    return loadCompositeNode(xmlStream);
+  }
+
+  //Note to self:  Stolen from TestUtils
+  ///Users/alefan/odl/controller4/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/TestUtils.java
+  // Figure out how to include TestUtils through pom ...was getting errors
+  private CompositeNode loadCompositeNode(InputStream xmlInputStream) throws FileNotFoundException {
+    if (xmlInputStream == null) {
+      throw new IllegalArgumentException();
+    }
+    Node<?> dataTree;
+    try {
+      dataTree = XmlTreeBuilder.buildDataTree(xmlInputStream);
+    } catch (XMLStreamException e) {
+      _logger.error("Error during building data tree from XML", e);
+      return null;
+    }
+    if (dataTree == null) {
+      _logger.error("data tree is null");
+      return null;
+    }
+    if (dataTree instanceof SimpleNode) {
+      _logger.error("RPC XML was resolved as SimpleNode");
+      return null;
+    }
+    return (CompositeNode) dataTree;
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/FourSimpleChildren.xml b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/FourSimpleChildren.xml
new file mode 100644 (file)
index 0000000..5ac991b
--- /dev/null
@@ -0,0 +1,6 @@
+<rpc>
+    <name>eth0</name>
+    <type>ethernetCsmacd</type>
+    <enabled>false</enabled>
+    <description>some interface</description>
+</rpc>
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/InvalidCompositeChild.xml b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/InvalidCompositeChild.xml
new file mode 100644 (file)
index 0000000..3979d02
--- /dev/null
@@ -0,0 +1,14 @@
+<rpc>
+    <innerinterface1>
+        <name>eth1</name>
+        <type>ethernet</type>
+        <enabled>false</enabled>
+        <description>some interface</description>
+    </innerinterface1>
+    <innerinterface2>
+        <name>error</name>
+        <type>ethernet</type>
+        <enabled>true</enabled>
+        <description>some interface</description>
+    </innerinterface2>
+</rpc>
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/InvalidSimpleChild.xml b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/InvalidSimpleChild.xml
new file mode 100644 (file)
index 0000000..6082d72
--- /dev/null
@@ -0,0 +1,3 @@
+<rpc>
+    <name>error</name>
+</rpc>
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/OneSimpleChild.xml b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/OneSimpleChild.xml
new file mode 100644 (file)
index 0000000..f431b04
--- /dev/null
@@ -0,0 +1,3 @@
+<rpc>
+    <name>eth0</name>
+</rpc>
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/OneSimpleOneCompositeChild.xml b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/OneSimpleOneCompositeChild.xml
new file mode 100644 (file)
index 0000000..bca7682
--- /dev/null
@@ -0,0 +1,9 @@
+<rpc>
+    <name>eth0</name>
+    <innerinterface>
+        <name>eth1</name>
+        <type>ethernetCsmacd</type>
+        <enabled>false</enabled>
+        <description>some interface</description>
+    </innerinterface>
+</rpc>
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/TwoCompositeChildren.xml b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/TwoCompositeChildren.xml
new file mode 100644 (file)
index 0000000..c49407e
--- /dev/null
@@ -0,0 +1,14 @@
+<rpc>
+    <innerinterface1>
+        <name>eth1</name>
+        <type>ethernet</type>
+        <enabled>false</enabled>
+        <description>some interface</description>
+    </innerinterface1>
+    <innerinterface2>
+        <name>eth2</name>
+        <type>ethernet</type>
+        <enabled>true</enabled>
+        <description>some interface</description>
+    </innerinterface2>
+</rpc>
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/TwoSimpleChildren.xml b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/TwoSimpleChildren.xml
new file mode 100644 (file)
index 0000000..5f4729c
--- /dev/null
@@ -0,0 +1,4 @@
+<rpc>
+    <name>eth0</name>
+    <type>ethernetCsmacd</type>
+</rpc>
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/pom.xml b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/pom.xml
new file mode 100644 (file)
index 0000000..5bfbcba
--- /dev/null
@@ -0,0 +1,26 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.opendaylight.controller</groupId>
+    <artifactId>sal-parent</artifactId>
+    <version>1.0-SNAPSHOT</version>
+    <relativePath>../..</relativePath>
+  </parent>
+  <packaging>pom</packaging>
+  <groupId>org.opendaylight.controller.tests</groupId>
+  <artifactId>sal-remoterpc-connector-test-parent</artifactId>
+  <scm>
+    <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
+    <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
+    <url>https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL</url>
+  </scm>
+
+  <modules>
+    <module>consumer-service</module>
+    <module>provider-service</module>
+    <module>test-it</module>
+    <module>test-nb</module>
+  </modules>
+
+</project>
@@ -2,11 +2,11 @@
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <parent>
-        <artifactId>sal-test-parent</artifactId>
+        <artifactId>sal-remoterpc-connector-test-parent</artifactId>
         <groupId>org.opendaylight.controller.tests</groupId>
         <version>1.0-SNAPSHOT</version>
   </parent>
-  <artifactId>zeromq-test-provider</artifactId>
+  <artifactId>sal-remoterpc-connector-test-provider</artifactId>
   <packaging>bundle</packaging>
   <scm>
     <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
       <groupId>org.opendaylight.yangtools</groupId>
       <artifactId>yang-data-api</artifactId>
     </dependency>
-
+        <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>yang-data-impl</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>sal-common-util</artifactId>
@@ -78,7 +81,7 @@
     </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
-      <artifactId>sal-zeromq-connector</artifactId>
+      <artifactId>sal-remoterpc-connector</artifactId>
       <version>1.0-SNAPSHOT</version>
     </dependency>
 
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/provider-service/src/main/java/org/opendaylight/controller/sample/zeromq/provider/ExampleProvider.java b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/provider-service/src/main/java/org/opendaylight/controller/sample/zeromq/provider/ExampleProvider.java
new file mode 100644 (file)
index 0000000..6c294dd
--- /dev/null
@@ -0,0 +1,120 @@
+package org.opendaylight.controller.sample.zeromq.provider;
+
+import org.opendaylight.controller.sal.common.util.RpcErrors;
+import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.CompositeNodeImpl;
+import org.opendaylight.controller.sal.core.api.AbstractProvider;
+import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.Node;
+import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
+import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.*;
+
+public class ExampleProvider extends AbstractProvider implements RpcImplementation {
+
+  private final URI namespace = URI.create("http://cisco.com/example");
+  private final QName QNAME = new QName(namespace, "heartbeat");
+  private RpcRegistration reg;
+
+  private ServiceRegistration thisReg;
+
+  private ProviderSession session;
+  private Logger _logger = LoggerFactory.getLogger(ExampleProvider.class);
+
+  @Override
+  public void onSessionInitiated(ProviderSession session) {
+    this.session = session;
+  }
+
+  @Override
+  public Set<QName> getSupportedRpcs() {
+    Set<QName> supportedRpcs = new HashSet<QName>();
+    supportedRpcs.add(QNAME);
+    return supportedRpcs;
+  }
+
+  @Override
+  public RpcResult<CompositeNode> invokeRpc(final QName rpc, CompositeNode input) {
+    boolean success = false;
+    CompositeNode output = null;
+    Collection<RpcError> errors = new ArrayList<>();
+
+    // Only handle supported RPC calls
+    if (getSupportedRpcs().contains(rpc))  {
+      if (input == null) {
+        errors.add(RpcErrors.getRpcError("app", "tag", "info", RpcError.ErrorSeverity.WARNING, "message:null input", RpcError.ErrorType.RPC, null));
+      }
+      else {
+        if (isErroneousInput(input)) {
+          errors.add(RpcErrors.getRpcError("app", "tag", "info", RpcError.ErrorSeverity.ERROR, "message:error", RpcError.ErrorType.RPC, null));
+        }
+        else {
+          success = true;
+          output = addSuccessNode(input);
+        }
+      }
+    }
+    return Rpcs.getRpcResult(success, output, errors);
+  }
+
+  // Examines input -- dives into CompositeNodes and finds any value equal to "error"
+  private boolean isErroneousInput(CompositeNode input) {
+    for (Node<?> n : input.getChildren()) {
+      if (n instanceof CompositeNode) {
+        if (isErroneousInput((CompositeNode)n)) {
+          return true;
+        }
+      }
+      else {  //SimpleNode
+        if ((input.getChildren().get(0).getValue()).equals("error")) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+  
+  // Adds a child SimpleNode containing the value "success" to the input CompositeNode
+  private CompositeNode addSuccessNode(CompositeNode input) {
+    List<Node<?>> list = new ArrayList<Node<?>>(input.getChildren());
+    SimpleNodeTOImpl<String> simpleNode = new SimpleNodeTOImpl<String>(QNAME, input, "success");
+    list.add(simpleNode);
+    return new CompositeNodeTOImpl(QNAME, null, list);
+  }
+
+  @Override
+  protected void startImpl(BundleContext context) {
+    thisReg = context.registerService(ExampleProvider.class, this, new Hashtable<String, String>());
+  }
+
+  @Override
+  protected void stopImpl(BundleContext context) {
+    if (reg != null) {
+      try {
+        reg.close();
+        thisReg.unregister();
+      } catch (Exception e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+    }
+  }
+
+  public void announce(QName name) {
+    _logger.debug("Announcing [{}]\n\n\n", name);
+    reg = this.session.addRpcImplementation(name, this);
+  }
+
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-it/pom.xml b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-it/pom.xml
new file mode 100644 (file)
index 0000000..4305a28
--- /dev/null
@@ -0,0 +1,536 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>sal-remoterpc-connector-test-parent</artifactId>
+        <groupId>org.opendaylight.controller.tests</groupId>
+        <version>1.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>sal-remoterpc-connector-test-it</artifactId>
+    <scm>
+        <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
+        <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
+        <url>https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL</url>
+    </scm>
+
+    <properties>
+        <exam.version>3.0.0</exam.version>
+        <url.version>1.5.0</url.version>
+        <config.version>0.2.3-SNAPSHOT</config.version>
+        <netconf.version>0.2.3-SNAPSHOT</netconf.version>
+    </properties>
+
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>commons-codec</groupId>
+                <artifactId>commons-codec</artifactId>
+                <version>1.7</version>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.ops4j.pax.exam</groupId>
+                <artifactId>maven-paxexam-plugin</artifactId>
+                <version>1.2.4</version>
+                <executions>
+                    <execution>
+                        <id>generate-config</id>
+                        <goals>
+                            <goal>generate-depends-file</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+        <pluginManagement>
+            <plugins>
+                <!--This plugin's configuration is used to store Eclipse 
+                    m2e settings only. It has no influence on the Maven build itself. -->
+                <plugin>
+                    <groupId>org.eclipse.m2e</groupId>
+                    <artifactId>lifecycle-mapping</artifactId>
+                    <version>1.0.0</version>
+                    <configuration>
+                        <lifecycleMappingMetadata>
+                            <pluginExecutions>
+                                <pluginExecution>
+                                    <pluginExecutionFilter>
+                                        <groupId>
+                                            org.ops4j.pax.exam
+                                        </groupId>
+                                        <artifactId>
+                                            maven-paxexam-plugin
+                                        </artifactId>
+                                        <versionRange>
+                                            [1.2.4,)
+                                        </versionRange>
+                                        <goals>
+                                            <goal>
+                                                generate-depends-file
+                                            </goal>
+                                        </goals>
+                                    </pluginExecutionFilter>
+                                    <action>
+                                        <ignore></ignore>
+                                    </action>
+                                </pluginExecution>
+                            </pluginExecutions>
+                        </lifecycleMappingMetadata>
+                    </configuration>
+                </plugin>
+            </plugins>
+        </pluginManagement>
+    </build>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.opendaylight.yangtools.thirdparty</groupId>
+            <artifactId>xtend-lib-osgi</artifactId>
+            <version>2.4.3</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller.tests</groupId>
+            <artifactId>sal-remoterpc-connector-test-provider</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller.tests</groupId>
+            <artifactId>sal-remoterpc-connector-test-consumer</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-broker-impl</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.ops4j.pax.exam</groupId>
+            <artifactId>pax-exam-container-native</artifactId>
+            <version>${exam.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.ops4j.pax.exam</groupId>
+            <artifactId>pax-exam-junit4</artifactId>
+            <version>${exam.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.ops4j.pax.exam</groupId>
+            <artifactId>pax-exam-link-mvn</artifactId>
+            <version>${exam.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.ops4j.pax.url</groupId>
+            <artifactId>pax-url-aether</artifactId>
+            <version>1.5.2</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>equinoxSDK381</groupId>
+            <artifactId>org.eclipse.osgi</artifactId>
+            <version>3.8.1.v20120830-144521</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>log4j-over-slf4j</artifactId>
+            <version>1.7.2</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+            <version>1.0.9</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>1.0.9</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-binding-api</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-common-util</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-core-api</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-remoterpc-connector</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>containermanager</artifactId>
+            <version>0.5.1-SNAPSHOT</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.osgi</groupId>
+                    <artifactId>org.osgi.compendium</artifactId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>commons-io</artifactId>
+                    <groupId>commons-io</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.opendaylight.yangtools</groupId>
+            <artifactId>yang-binding</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.yangtools</groupId>
+            <artifactId>yang-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.yangtools</groupId>
+            <artifactId>yang-data-api</artifactId>
+        </dependency>
+        <!--dependency> <groupId>org.opendaylight.yangtools</groupId> <artifactId>yang-data-impl</artifactId> 
+            <version>0.5.9-SNAPSHOT</version> </dependency -->
+        <dependency>
+            <groupId>org.opendaylight.yangtools</groupId>
+            <artifactId>yang-parser-impl</artifactId>
+            <version>0.5.9-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-common-util</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.yangtools.thirdparty</groupId>
+            <artifactId>antlr4-runtime-osgi-nohead</artifactId>
+            <version>4.0</version>
+        </dependency>
+
+        <!-- routing table dependencies -->
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>zeromq-routingtable.implementation</artifactId>
+            <version>0.4.1-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>clustering.services</artifactId>
+            <version>0.4.1-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal</artifactId>
+            <version>0.5.1-SNAPSHOT</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.osgi</groupId>
+                    <artifactId>org.osgi.compendium</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal.implementation</artifactId>
+            <version>0.4.0-SNAPSHOT</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>commons-io</artifactId>
+                    <groupId>commons-io</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>containermanager</artifactId>
+            <version>0.5.0-SNAPSHOT</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.osgi</groupId>
+                    <artifactId>org.osgi.compendium</artifactId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>commons-io</artifactId>
+                    <groupId>commons-io</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>containermanager.it.implementation</artifactId>
+            <version>0.5.0-SNAPSHOT</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>commons-io</artifactId>
+                    <groupId>commons-io</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>clustering.stub</artifactId>
+            <version>0.4.0-SNAPSHOT</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>commons-io</artifactId>
+                    <groupId>commons-io</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.felix</groupId>
+            <artifactId>org.apache.felix.dependencymanager.shell</artifactId>
+            <version>3.0.1</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.osgi</groupId>
+                    <artifactId>org.osgi.compendium</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>eclipselink</groupId>
+            <artifactId>javax.resource</artifactId>
+            <version>1.5.0.v200906010428</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal</artifactId>
+            <version>0.5.1-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>ietf-netconf-monitoring</artifactId>
+        <version>${netconf.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.yangtools</groupId>
+            <artifactId>yang-binding</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.yangtools.model</groupId>
+            <artifactId>yang-ext</artifactId>
+            <version>2013.09.07.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.yangtools.model</groupId>
+            <artifactId>opendaylight-l2-types</artifactId>
+            <version>2013.08.27.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-binding-it</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-binding-config</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-binding-broker-impl</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-broker-impl</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.opendaylight.controller.model</groupId>
+            <artifactId>model-inventory</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.yangtools</groupId>
+            <artifactId>yang-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-connector-api</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-common-util</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>clustering.services</artifactId>
+            <version>0.4.1-SNAPSHOT</version>
+        </dependency>
+
+        <dependency>
+            <groupId>equinoxSDK381</groupId>
+            <artifactId>org.eclipse.osgi</artifactId>
+            <version>3.8.1.v20120830-144521</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-mapper-asl</artifactId>
+            <version>1.9.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-core-asl</artifactId>
+            <version>1.9.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.zeromq</groupId>
+            <artifactId>jeromq</artifactId>
+            <version>0.3.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.yangtools.thirdparty</groupId>
+            <artifactId>xtend-lib-osgi</artifactId>
+            <version>2.4.3</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-binding-broker-impl</artifactId>
+            <version>1.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.ops4j.pax.exam</groupId>
+            <artifactId>pax-exam-container-native</artifactId>
+            <version>${exam.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.ops4j.pax.exam</groupId>
+            <artifactId>pax-exam-junit4</artifactId>
+            <version>${exam.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>config-netconf-connector</artifactId>
+            <version>${netconf.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>yang-store-impl</artifactId>
+            <version>${config.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>logback-config</artifactId>
+            <version>${config.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>config-persister-impl</artifactId>
+            <version>${config.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>config-persister-file-adapter</artifactId>
+            <version>${config.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>netconf-impl</artifactId>
+            <version>${netconf.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>netconf-client</artifactId>
+            <version>${netconf.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.ops4j.pax.exam</groupId>
+            <artifactId>pax-exam</artifactId>
+            <version>${exam.version}</version>
+            <!-- Compile scope here is intentional, it is used in TestHelper 
+                class which could be downloaded via nexus and reused in other integration 
+                tests. -->
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.ops4j.pax.exam</groupId>
+            <artifactId>pax-exam-link-mvn</artifactId>
+            <version>${exam.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>equinoxSDK381</groupId>
+            <artifactId>org.eclipse.osgi</artifactId>
+            <version>3.8.1.v20120830-144521</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>log4j-over-slf4j</artifactId>
+            <version>1.7.2</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+            <version>1.0.9</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>1.0.9</version>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller.model</groupId>
+            <artifactId>model-flow-service</artifactId>
+            <version>1.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>config-manager</artifactId>
+            <version>0.2.3-SNAPSHOT</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>commons-io</artifactId>
+                    <groupId>commons-io</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller.model</groupId>
+            <artifactId>model-flow-management</artifactId>
+            <version>1.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.yangtools.thirdparty</groupId>
+            <artifactId>antlr4-runtime-osgi-nohead</artifactId>
+            <version>4.0</version>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/RouterTest.java b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/RouterTest.java
new file mode 100644 (file)
index 0000000..62c094d
--- /dev/null
@@ -0,0 +1,456 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sample.zeromq.test.it;
+
+import junit.framework.Assert;
+
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.opendaylight.controller.sal.connector.remoterpc.Client;
+import org.opendaylight.controller.sal.connector.remoterpc.RemoteRpcClient;
+import org.opendaylight.controller.sal.connector.remoterpc.RemoteRpcServer;
+import org.opendaylight.controller.sal.connector.remoterpc.ServerImpl;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.controller.sample.zeromq.provider.ExampleProvider;
+import org.opendaylight.controller.sample.zeromq.consumer.ExampleConsumer;
+import org.opendaylight.controller.test.sal.binding.it.TestHelper;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.ops4j.pax.exam.Configuration;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.junit.PaxExam;
+import org.ops4j.pax.exam.util.Filter;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleException;
+import org.osgi.framework.ServiceReference;
+import org.osgi.framework.ServiceRegistration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.ZMQ;
+
+import javax.inject.Inject;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Hashtable;
+
+import static org.opendaylight.controller.test.sal.binding.it.TestHelper.baseModelBundles;
+import static org.opendaylight.controller.test.sal.binding.it.TestHelper.bindingAwareSalBundles;
+import static org.opendaylight.controller.test.sal.binding.it.TestHelper.configMinumumBundles;
+import static org.opendaylight.controller.test.sal.binding.it.TestHelper.mdSalCoreBundles;
+import static org.ops4j.pax.exam.CoreOptions.*;
+
+@RunWith(PaxExam.class)
+public class RouterTest {
+
+  private Logger _logger = LoggerFactory.getLogger(RouterTest.class);
+
+  public static final String ODL = "org.opendaylight.controller";
+  public static final String YANG = "org.opendaylight.yangtools";
+  public static final String SAMPLE = "org.opendaylight.controller.tests";
+  private final URI namespace = URI.create("http://cisco.com/example");
+  private final QName QNAME = new QName(namespace, "heartbeat");
+
+
+  @Inject
+  org.osgi.framework.BundleContext ctx;
+
+  @Inject
+  @Filter(timeout=60*1000)
+  Broker broker;
+  
+  private ZMQ.Context zmqCtx = ZMQ.context(1);
+  //private Server router;
+  //private ExampleProvider provider;
+
+  //@Test
+  public void testInvokeRpc() throws Exception{
+    //Thread.sleep(1000);
+    //Send announcement
+    ServiceReference providerRef = ctx.getServiceReference(ExampleProvider.class);
+    Assert.assertNotNull(providerRef);
+
+    ExampleProvider provider = (ExampleProvider)ctx.getService(providerRef);
+    Assert.assertNotNull(provider);
+
+    ServiceReference consumerRef = ctx.getServiceReference(ExampleConsumer.class);
+    Assert.assertNotNull(consumerRef);
+    ExampleConsumer consumer = (ExampleConsumer)ctx.getService(consumerRef);
+    Assert.assertNotNull(consumer);
+
+
+    _logger.debug("Provider sends announcement [{}]", "heartbeat");
+    provider.announce(QNAME);
+    ServiceReference routerRef = ctx.getServiceReference(Client.class);
+    Client router = (Client) ctx.getService(routerRef);
+    _logger.debug("Found router[{}]", router);
+    _logger.debug("Invoking RPC [{}]", QNAME);
+    for (int i = 0; i < 3; i++) {
+      RpcResult<CompositeNode> result = router.invokeRpc(QNAME, consumer.getValidCompositeNodeWithOneSimpleChild());
+      _logger.debug("{}-> Result is: Successful:[{}], Payload:[{}], Errors: [{}]", i, result.isSuccessful(), result.getResult(), result.getErrors());
+      Assert.assertNotNull(result);
+    }
+  }
+
+  @Test
+  public void testInvokeRpcWithValidSimpleNode() throws Exception{
+    //Thread.sleep(1500);
+
+    ServiceReference providerRef = ctx.getServiceReference(ExampleProvider.class);
+    Assert.assertNotNull(providerRef);
+    ExampleProvider provider = (ExampleProvider)ctx.getService(providerRef);
+    Assert.assertNotNull(provider);
+    ServiceReference consumerRef = ctx.getServiceReference(ExampleConsumer.class);
+    Assert.assertNotNull(consumerRef);
+    ExampleConsumer consumer = (ExampleConsumer)ctx.getService(consumerRef);
+    Assert.assertNotNull(consumer);
+
+    // Provider sends announcement
+    _logger.debug("Provider sends announcement [{}]", "heartbeat");
+    provider.announce(QNAME);
+    // Consumer invokes RPC
+    _logger.debug("Invoking RPC [{}]", QNAME);
+    CompositeNode input = consumer.getValidCompositeNodeWithOneSimpleChild();
+    for (int i = 0; i < 3; i++) {
+      RpcResult<CompositeNode> result = consumer.invokeRpc(QNAME, input);
+      Assert.assertNotNull(result);
+      _logger.debug("{}-> Result is: Successful:[{}], Payload:[{}], Errors: [{}]", i, result.isSuccessful(), result.getResult(), result.getErrors());
+      Assert.assertTrue(result.isSuccessful());
+      Assert.assertNotNull(result.getResult());
+      Assert.assertEquals(0, result.getErrors().size());
+      Assert.assertEquals(input.getChildren().size()+1, result.getResult().getChildren().size());
+    }
+  }
+
+  @Test
+  public void testInvokeRpcWithValidSimpleNodes() throws Exception{
+    //Thread.sleep(1500);
+
+    ServiceReference providerRef = ctx.getServiceReference(ExampleProvider.class);
+    Assert.assertNotNull(providerRef);
+    ExampleProvider provider = (ExampleProvider)ctx.getService(providerRef);
+    Assert.assertNotNull(provider);
+    ServiceReference consumerRef = ctx.getServiceReference(ExampleConsumer.class);
+    Assert.assertNotNull(consumerRef);
+    ExampleConsumer consumer = (ExampleConsumer)ctx.getService(consumerRef);
+    Assert.assertNotNull(consumer);
+
+    // Provider sends announcement
+    _logger.debug("Provider sends announcement [{}]", "heartbeat");
+    provider.announce(QNAME);
+    // Consumer invokes RPC
+    _logger.debug("Invoking RPC [{}]", QNAME);
+    CompositeNode input = consumer.getValidCompositeNodeWithFourSimpleChildren();
+    for (int i = 0; i < 3; i++) {
+      RpcResult<CompositeNode> result = consumer.invokeRpc(QNAME, input);
+      Assert.assertNotNull(result);
+      _logger.debug("{}-> Result is: Successful:[{}], Payload:[{}], Errors: [{}]", i, result.isSuccessful(), result.getResult(), result.getErrors());
+      Assert.assertTrue(result.isSuccessful());
+      Assert.assertNotNull(result.getResult());
+      Assert.assertEquals(0, result.getErrors().size());
+      Assert.assertEquals(input.getChildren().size()+1, result.getResult().getChildren().size());
+    }
+  }
+
+  @Test
+  public void testInvokeRpcWithValidCompositeNode() throws Exception{
+    //Thread.sleep(1500);
+
+    ServiceReference providerRef = ctx.getServiceReference(ExampleProvider.class);
+    Assert.assertNotNull(providerRef);
+    ExampleProvider provider = (ExampleProvider)ctx.getService(providerRef);
+    Assert.assertNotNull(provider);
+    ServiceReference consumerRef = ctx.getServiceReference(ExampleConsumer.class);
+    Assert.assertNotNull(consumerRef);
+    ExampleConsumer consumer = (ExampleConsumer)ctx.getService(consumerRef);
+    Assert.assertNotNull(consumer);
+
+    // Provider sends announcement
+    _logger.debug("Provider sends announcement [{}]", "heartbeat");
+    provider.announce(QNAME);
+    // Consumer invokes RPC
+    _logger.debug("Invoking RPC [{}]", QNAME);
+    CompositeNode input = consumer.getValidCompositeNodeWithTwoCompositeChildren();
+    for (int i = 0; i < 3; i++) {
+      RpcResult<CompositeNode> result = consumer.invokeRpc(QNAME, input);
+      Assert.assertNotNull(result);
+      _logger.debug("{}-> Result is: Successful:[{}], Payload:[{}], Errors: [{}]", i, result.isSuccessful(), result.getResult(), result.getErrors());
+      Assert.assertTrue(result.isSuccessful());
+      Assert.assertNotNull(result.getResult());
+      Assert.assertEquals(0, result.getErrors().size());
+      Assert.assertEquals(input.getChildren().size()+1, result.getResult().getChildren().size());
+    }
+  }
+
+  @Test
+  public void testInvokeRpcWithNullInput() throws Exception{
+    //Thread.sleep(1500);
+
+    ServiceReference providerRef = ctx.getServiceReference(ExampleProvider.class);
+    Assert.assertNotNull(providerRef);
+    ExampleProvider provider = (ExampleProvider)ctx.getService(providerRef);
+    Assert.assertNotNull(provider);
+    ServiceReference consumerRef = ctx.getServiceReference(ExampleConsumer.class);
+    Assert.assertNotNull(consumerRef);
+    ExampleConsumer consumer = (ExampleConsumer)ctx.getService(consumerRef);
+    Assert.assertNotNull(consumer);
+
+    // Provider sends announcement
+    _logger.debug("Provider sends announcement [{}]", QNAME.getLocalName());
+    provider.announce(QNAME);
+    // Consumer invokes RPC
+    _logger.debug("Invoking RPC [{}]", QNAME);
+    for (int i = 0; i < 3; i++) {
+      RpcResult<CompositeNode> result = consumer.invokeRpc(QNAME, null);
+      Assert.assertNotNull(result);
+      _logger.debug("{}-> Result is: Successful:[{}], Payload:[{}], Errors: [{}]", i, result.isSuccessful(), result.getResult(), result.getErrors());
+      Assert.assertFalse(result.isSuccessful());
+      Assert.assertNull(result.getResult());
+      Assert.assertEquals(1, result.getErrors().size());
+      Assert.assertEquals(RpcError.ErrorSeverity.WARNING, ((RpcError)result.getErrors().toArray()[0]).getSeverity());
+    }
+  }
+
+  @Test
+  public void testInvokeRpcWithInvalidSimpleNode() throws Exception{
+    //Thread.sleep(1500);
+
+    ServiceReference providerRef = ctx.getServiceReference(ExampleProvider.class);
+    Assert.assertNotNull(providerRef);
+    ExampleProvider provider = (ExampleProvider)ctx.getService(providerRef);
+    Assert.assertNotNull(provider);
+    ServiceReference consumerRef = ctx.getServiceReference(ExampleConsumer.class);
+    Assert.assertNotNull(consumerRef);
+    ExampleConsumer consumer = (ExampleConsumer)ctx.getService(consumerRef);
+    Assert.assertNotNull(consumer);
+
+    // Provider sends announcement
+    _logger.debug("Provider sends announcement [{}]", QNAME.getLocalName());
+    provider.announce(QNAME);
+    // Consumer invokes RPC
+    _logger.debug("Invoking RPC [{}]", QNAME);
+    CompositeNode input = consumer.getInvalidCompositeNodeSimpleChild();
+    for (int i = 0; i < 3; i++) {
+      RpcResult<CompositeNode> result = consumer.invokeRpc(QNAME, input);
+      Assert.assertNotNull(result);
+      _logger.debug("{}-> Result is: Successful:[{}], Payload:[{}], Errors: [{}]", i, result.isSuccessful(), result.getResult(), result.getErrors());
+      Assert.assertFalse(result.isSuccessful());
+      Assert.assertNull(result.getResult());
+      Assert.assertEquals(1, result.getErrors().size());
+      Assert.assertEquals(RpcError.ErrorSeverity.ERROR, ((RpcError)result.getErrors().toArray()[0]).getSeverity());
+    }
+  }
+
+  @Test
+  public void testInvokeRpcWithInvalidCompositeNode() throws Exception{
+    //Thread.sleep(1500);
+
+    ServiceReference providerRef = ctx.getServiceReference(ExampleProvider.class);
+    Assert.assertNotNull(providerRef);
+    ExampleProvider provider = (ExampleProvider)ctx.getService(providerRef);
+    Assert.assertNotNull(provider);
+    ServiceReference consumerRef = ctx.getServiceReference(ExampleConsumer.class);
+    Assert.assertNotNull(consumerRef);
+    ExampleConsumer consumer = (ExampleConsumer)ctx.getService(consumerRef);
+    Assert.assertNotNull(consumer);
+
+    // Provider sends announcement
+    _logger.debug("Provider sends announcement [{}]", QNAME.getLocalName());
+    provider.announce(QNAME);
+    // Consumer invokes RPC
+    _logger.debug("Invoking RPC [{}]", QNAME);
+    CompositeNode input = consumer.getInvalidCompositeNodeCompositeChild();
+    for (int i = 0; i < 3; i++) {
+      RpcResult<CompositeNode> result = consumer.invokeRpc(QNAME, input);
+      Assert.assertNotNull(result);
+      _logger.debug("{}-> Result is: Successful:[{}], Payload:[{}], Errors: [{}]", i, result.isSuccessful(), result.getResult(), result.getErrors());
+      Assert.assertFalse(result.isSuccessful());
+      Assert.assertNull(result.getResult());
+      Assert.assertEquals(1, result.getErrors().size());
+      Assert.assertEquals(RpcError.ErrorSeverity.ERROR, ((RpcError)result.getErrors().toArray()[0]).getSeverity());
+    }
+  }
+
+  //@Test
+  // This method is UNTESTED -- need to get around the bundling issues before I know if this even work
+//  public void testInvokeRpcWithValidCompositeNode() throws Exception{
+//    Thread.sleep(10000);
+//    //Send announcement
+//    ServiceReference providerRef = ctx.getServiceReference(ExampleProvider.class);
+//    Assert.assertNotNull(providerRef);
+//
+//    ExampleProvider provider = (ExampleProvider)ctx.getService(providerRef);
+//    Assert.assertNotNull(provider);
+//
+//    ServiceReference consumerRef = ctx.getServiceReference(ExampleConsumer.class);
+//    Assert.assertNotNull(consumerRef);
+//
+//    ExampleConsumer consumer = (ExampleConsumer)ctx.getService(consumerRef);
+//    Assert.assertNotNull(consumer);
+//
+//    _logger.debug("Provider sends announcement [{}]", "heartbeat");
+//    provider.announce(QNAME);
+//    ServiceReference routerRef = ctx.getServiceReference(Client.class);
+//    Client router = (Client) ctx.getService(routerRef);
+//    _logger.debug("Found router[{}]", router);
+//    _logger.debug("Invoking RPC [{}]", QNAME);
+//    for (int i = 0; i < 3; i++) {
+//      RpcResult<CompositeNode> result = router.getInstance().invokeRpc(QNAME, consumer.getValidCompositeNodeWithOneSimpleChild());
+//      _logger.debug("{}-> Result is: Successful:[{}], Payload:[{}], Errors: [{}]", i, result.isSuccessful(), result.getResult(), result.getErrors());
+//      Assert.assertNotNull(result);
+//    }
+//  }
+
+  private Message send(Message msg) throws IOException {
+    ZMQ.Socket reqSocket = zmqCtx.socket(ZMQ.REQ);
+    reqSocket.connect("tcp://localhost:5555");
+    reqSocket.send(Message.serialize(msg));
+    Message response = parseMessage(reqSocket);
+
+    return response;
+  }
+
+  /**
+   * @param socket
+   * @return
+   */
+  private Message parseMessage(ZMQ.Socket socket) {
+
+    Message msg = null;
+    try {
+      byte[] bytes = socket.recv();
+      _logger.debug("Received bytes:[{}]", bytes.length);
+      msg = (Message) Message.deserialize(bytes);
+    } catch (Throwable t) {
+      t.printStackTrace();
+    }
+    return msg;
+  }
+
+  
+  private void printState(){
+    Bundle[] b = ctx.getBundles();
+    _logger.debug("\n\nNumber of bundles [{}]\n\n]", b.length);
+    for (int i=0;i<b.length;i++){
+      _logger.debug("Bundle States {}-{} ",b[i].getSymbolicName(), stateToString(b[i].getState()));
+
+      if ( Bundle.INSTALLED == b[i].getState() || (Bundle.RESOLVED == b[i].getState())){
+        try {
+          b[i].start();
+        } catch (BundleException e) {
+          e.printStackTrace();
+        }
+      }
+    }
+  }
+  private String stateToString(int state) {
+    switch (state) {
+      case Bundle.ACTIVE:
+        return "ACTIVE";
+      case Bundle.INSTALLED:
+        return "INSTALLED";
+      case Bundle.RESOLVED:
+        return "RESOLVED";
+      case Bundle.UNINSTALLED:
+        return "UNINSTALLED";
+      default:
+        return "Not CONVERTED";
+    }
+  }
+
+  @Configuration
+  public Option[] config() {
+    return options(systemProperty("osgi.console").value("2401"),
+        systemProperty("rpc.port").value("5555"),
+        mavenBundle("org.slf4j", "slf4j-api").versionAsInProject(), //
+        mavenBundle("org.slf4j", "log4j-over-slf4j").versionAsInProject(), //
+        mavenBundle("ch.qos.logback", "logback-core").versionAsInProject(), //
+        mavenBundle("ch.qos.logback", "logback-classic").versionAsInProject(), //
+
+        //mavenBundle(ODL, "sal-binding-broker-impl").versionAsInProject().update(), //
+        mavenBundle(ODL, "sal-common").versionAsInProject(), //
+        mavenBundle(ODL, "sal-common-api").versionAsInProject(),//
+        mavenBundle(ODL, "sal-common-impl").versionAsInProject(), //
+        mavenBundle(ODL, "sal-common-util").versionAsInProject(), //
+        mavenBundle(ODL, "sal-core-api").versionAsInProject().update(), //
+        mavenBundle(ODL, "sal-broker-impl").versionAsInProject(), //
+        mavenBundle(ODL, "sal-core-spi").versionAsInProject().update(), //
+        mavenBundle(ODL, "sal-connector-api").versionAsInProject(), //
+
+
+        
+        baseModelBundles(),
+        bindingAwareSalBundles(),
+        TestHelper.bindingIndependentSalBundles(),
+        TestHelper.configMinumumBundles(),
+        TestHelper.mdSalCoreBundles(),
+        
+      //Added the consumer
+        mavenBundle(SAMPLE, "sal-remoterpc-connector-test-consumer").versionAsInProject(), //
+      //**** These two bundles below are NOT successfully resolved -- some of their dependencies must be missing
+      //**** This causes the "Message" error to occur, the class cannot be found
+        mavenBundle(SAMPLE, "sal-remoterpc-connector-test-provider").versionAsInProject(), //
+        mavenBundle(ODL, "sal-remoterpc-connector").versionAsInProject(), //
+
+        mavenBundle(ODL, "zeromq-routingtable.implementation").versionAsInProject(),
+        mavenBundle(YANG, "concepts").versionAsInProject(),
+        mavenBundle(YANG, "yang-binding").versionAsInProject(), //
+        mavenBundle(YANG, "yang-common").versionAsInProject(), //
+        mavenBundle(YANG, "yang-data-api").versionAsInProject(), //
+        mavenBundle(YANG, "yang-data-impl").versionAsInProject(), //
+        mavenBundle(YANG, "yang-model-api").versionAsInProject(), //
+        mavenBundle(YANG, "yang-parser-api").versionAsInProject(), //
+        mavenBundle(YANG, "yang-parser-impl").versionAsInProject(), //
+        mavenBundle(YANG, "yang-model-util").versionAsInProject(), //
+        mavenBundle(YANG + ".thirdparty", "xtend-lib-osgi").versionAsInProject(), //
+        mavenBundle(YANG + ".thirdparty", "antlr4-runtime-osgi-nohead").versionAsInProject(), //
+        mavenBundle("com.google.guava", "guava").versionAsInProject(), //
+        mavenBundle("org.zeromq", "jeromq").versionAsInProject(),
+        mavenBundle("org.codehaus.jackson", "jackson-mapper-asl").versionAsInProject(),
+        mavenBundle("org.codehaus.jackson", "jackson-core-asl").versionAsInProject(),
+        //routingtable dependencies
+        systemPackages("sun.reflect", "sun.reflect.misc", "sun.misc"),
+        // List framework bundles
+        mavenBundle("equinoxSDK381", "org.eclipse.equinox.console").versionAsInProject(),
+        mavenBundle("equinoxSDK381", "org.eclipse.equinox.util").versionAsInProject(),
+        mavenBundle("equinoxSDK381", "org.eclipse.osgi.services").versionAsInProject(),
+        mavenBundle("equinoxSDK381", "org.eclipse.equinox.ds").versionAsInProject(),
+        mavenBundle("equinoxSDK381", "org.apache.felix.gogo.command").versionAsInProject(),
+        mavenBundle("equinoxSDK381", "org.apache.felix.gogo.runtime").versionAsInProject(),
+        mavenBundle("equinoxSDK381", "org.apache.felix.gogo.shell").versionAsInProject(),
+        // List logger bundles
+
+        mavenBundle("org.opendaylight.controller", "clustering.services")
+            .versionAsInProject(),
+        mavenBundle("org.opendaylight.controller", "clustering.stub")
+            .versionAsInProject(),
+
+
+        // List all the bundles on which the test case depends
+        mavenBundle("org.opendaylight.controller", "sal")
+            .versionAsInProject(),
+        mavenBundle("org.opendaylight.controller", "sal.implementation")
+            .versionAsInProject(),
+        mavenBundle("org.jboss.spec.javax.transaction",
+            "jboss-transaction-api_1.1_spec").versionAsInProject(),
+        mavenBundle("org.apache.commons", "commons-lang3")
+            .versionAsInProject(),
+        mavenBundle("org.apache.felix",
+            "org.apache.felix.dependencymanager")
+            .versionAsInProject(),
+
+        junitBundles()
+    );
+  }
+
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-it/src/test/resources/controller.config b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-it/src/test/resources/controller.config
new file mode 100644 (file)
index 0000000..0d9cd6a
--- /dev/null
@@ -0,0 +1,123 @@
+//START OF CONFIG-LAST
+<data xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
+<modules xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
+       <module>
+               <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:impl">prefix:schema-service-singleton</type>
+               <name>yang-schema-service</name>
+       </module>
+       <module>
+               <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:impl">prefix:hash-map-data-store</type>
+               <name>hash-map-data-store</name>
+       </module>
+       <module>
+               <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:impl">prefix:dom-broker-impl</type>
+               <name>dom-broker</name>
+               <data-store xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:impl">
+                       <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:dom-data-store</type>
+                       <name>ref_hash-map-data-store</name>
+               </data-store>
+       </module>
+       <module>
+               <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">prefix:binding-broker-impl</type>
+               <name>binding-broker-impl</name>
+               <notification-service xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">
+                       <type xmlns:binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">binding:binding-notification-service</type>
+                       <name>ref_binding-notification-broker</name>
+               </notification-service>
+               <data-broker xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">
+                       <type xmlns:binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">binding:binding-data-broker</type>
+                       <name>ref_binding-data-broker</name>
+               </data-broker>
+       </module>
+       <module>
+               <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">prefix:runtime-generated-mapping</type>
+               <name>runtime-mapping-singleton</name>
+       </module>
+       <module>
+               <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">prefix:binding-notification-broker</type>
+               <name>binding-notification-broker</name>
+       </module>
+       <module>
+               <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">prefix:binding-data-broker</type>
+               <name>binding-data-broker</name>
+               <dom-broker xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">
+                       <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:dom-broker-osgi-registry</type>
+                       <name>ref_dom-broker</name>
+               </dom-broker>
+               <mapping-service xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">
+               <type xmlns:binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">binding:binding-dom-mapping-service</type>
+               <name>ref_runtime-mapping-singleton</name>
+               </mapping-service>
+       </module>
+       <module>
+               <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote:rpc">prefix:remote-zeromq-rpc-server</type>
+               <name>remoter</name>
+               <port xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote:rpc">5666</port>
+               <dom-broker xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote:rpc">
+                       <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">prefix:dom-broker-osgi-registry</type>
+                       <name>ref_dom-broker</name>
+               </dom-broker>
+       </module>
+</modules>
+<services xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
+       <service>
+       <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:schema-service</type>
+               <instance>
+               <name>ref_yang-schema-service</name>
+               <provider>/config/modules/module[name='schema-service-singleton']/instance[name='yang-schema-service']</provider>
+               </instance>
+       </service>
+       <service>
+               <type xmlns:binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">binding:binding-notification-service</type>
+               <instance>
+                       <name>ref_binding-notification-broker</name>
+                       <provider>/config/modules/module[name='binding-notification-broker']/instance[name='binding-notification-broker']</provider>
+               </instance>
+       </service>
+       <service>
+               <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:dom-data-store</type>
+               <instance>
+                       <name>ref_hash-map-data-store</name>
+                       <provider>/config/modules/module[name='hash-map-data-store']/instance[name='hash-map-data-store']</provider>
+               </instance>
+       </service>
+       <service>
+               <type xmlns:binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">binding:binding-broker-osgi-registry</type>
+               <instance>
+                       <name>ref_binding-broker-impl</name>
+                       <provider>/config/modules/module[name='binding-broker-impl']/instance[name='binding-broker-impl']</provider>
+               </instance>
+       </service>
+       <service>
+               <type xmlns:binding-impl="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">binding-impl:binding-dom-mapping-service</type>
+               <instance>
+                       <name>ref_runtime-mapping-singleton</name>
+                       <provider>/config/modules/module[name='runtime-generated-mapping']/instance[name='runtime-mapping-singleton']</provider>
+               </instance>
+       </service>
+       <service>
+       <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:dom-broker-osgi-registry</type>
+               <instance>
+                       <name>ref_dom-broker</name>
+                       <provider>/config/modules/module[name='dom-broker-impl']/instance[name='dom-broker']</provider>
+               </instance>
+       </service>
+       <service>
+               <type xmlns:binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">binding:binding-data-broker</type>
+               <instance>
+               <name>ref_binding-data-broker</name>
+               <provider>/config/modules/module[name='binding-data-broker']/instance[name='binding-data-broker']</provider>
+       </instance>
+       </service>
+</services>
+</data>
+
+
+//END OF SNAPSHOT
+urn:opendaylight:params:xml:ns:yang:controller:config?module=config&revision=2013-04-05
+urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl?module=opendaylight-sal-binding-broker-impl&revision=2013-10-28
+urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding?module=opendaylight-md-sal-binding&revision=2013-10-28
+urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:impl?module=opendaylight-sal-dom-broker-impl&revision=2013-10-28
+urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom?module=opendaylight-md-sal-dom&revision=2013-10-28
+urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote:rpc?module=odl-sal-dom-rpc-remote-cfg&revision=2013-10-28
+//END OF CONFIG
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-it/src/test/resources/logback.xml b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-it/src/test/resources/logback.xml
new file mode 100644 (file)
index 0000000..1d17796
--- /dev/null
@@ -0,0 +1,16 @@
+<configuration scan="true">
+
+  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+    <encoder>
+      <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
+      </pattern>
+    </encoder>
+  </appender>
+
+
+  <logger name="org.opendaylight.yangtools.yang.parser.util.ModuleDependencySort" level="ERROR"/>
+
+  <root level="info">
+    <appender-ref ref="STDOUT" />
+  </root>
+</configuration>
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-nb/pom.xml b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-nb/pom.xml
new file mode 100644 (file)
index 0000000..dd7e36c
--- /dev/null
@@ -0,0 +1,109 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>sal-remoterpc-connector-test-parent</artifactId>
+    <groupId>org.opendaylight.controller.tests</groupId>
+    <version>1.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>sal-remoterpc-connector-test-nb</artifactId>
+  <packaging>bundle</packaging>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.felix</groupId>
+        <artifactId>maven-bundle-plugin</artifactId>
+        <version>${bundle.plugin.version}</version>
+        <extensions>true</extensions>
+        <configuration>
+          <instructions>
+            <Export-Package>
+            </Export-Package>
+            <Import-Package>
+              com.sun.jersey.spi.container.servlet,
+              org.codehaus.jackson.annotate,
+              javax.ws.rs,
+              javax.ws.rs.core,
+              javax.xml.bind,
+              javax.xml.bind.annotation,
+              org.slf4j,
+              org.apache.catalina.filters,
+              org.codehaus.jackson.jaxrs,
+              org.opendaylight.controller.sample.zeromq.provider,
+              org.opendaylight.controller.sample.zeromq.consumer,
+              org.opendaylight.controller.sal.utils,
+              org.opendaylight.yangtools.yang.common,
+              org.opendaylight.controller.sal.connector.api,
+              org.opendaylight.controller.sal.connector.remoterpc.api;version="[0.4,1)",
+              org.opendaylight.controller.sal.connector.remoterpc.impl;version="[0.4,1)",
+              org.opendaylight.controller.sal.connector.remoterpc.dto,
+              org.opendaylight.controller.sal.connector.remoterpc.util,
+              org.osgi.framework,
+              com.google.common.base,
+              org.opendaylight.yangtools.yang.data.api,
+              !org.codehaus.enunciate.jaxrs
+
+            </Import-Package>
+            <Web-ContextPath>/controller/nb/v2/zmqnb</Web-ContextPath>
+            <Jaxrs-Resources>,${classes;ANNOTATION;javax.ws.rs.Path}</Jaxrs-Resources>
+          </instructions>
+          <manifestLocation>${project.basedir}/src/main/resources/META-INF</manifestLocation>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>containermanager</artifactId>
+      <version>0.5.1-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>commons.northbound</artifactId>
+      <version>0.4.1-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal</artifactId>
+      <version>0.5.1-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller.tests</groupId>
+      <artifactId>sal-remoterpc-connector-test-provider</artifactId>
+      <version>1.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller.tests</groupId>
+      <artifactId>sal-remoterpc-connector-test-consumer</artifactId>
+      <version>1.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-remoterpc-connector</artifactId>
+      <version>1.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.osgi</groupId>
+      <artifactId>org.osgi.core</artifactId>
+      <version>5.0.0</version>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+    </dependency>
+      <dependency>
+          <groupId>org.opendaylight.controller</groupId>
+          <artifactId>zeromq-routingtable.implementation</artifactId>
+          <version>0.4.1-SNAPSHOT</version>
+      </dependency>
+      <dependency>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+      </dependency>
+  </dependencies>
+
+ </project>
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-nb/src/main/java/org/opendaylight/controller/tests/zmqrouter/rest/Router.java b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-nb/src/main/java/org/opendaylight/controller/tests/zmqrouter/rest/Router.java
new file mode 100644 (file)
index 0000000..6c9ec4e
--- /dev/null
@@ -0,0 +1,246 @@
+package org.opendaylight.controller.tests.zmqrouter.rest;
+
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
+import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.CompositeNodeImpl;
+import org.opendaylight.controller.sal.connector.remoterpc.impl.RoutingTableImpl;
+import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
+import org.opendaylight.controller.sample.zeromq.consumer.ExampleConsumer;
+import org.opendaylight.controller.sample.zeromq.provider.ExampleProvider;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.osgi.framework.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import java.io.Serializable;
+import java.net.URI;
+import java.util.Set;
+
+@Path("router")
+public class Router {
+  private Logger _logger = LoggerFactory.getLogger(Router.class);
+  private final URI namespace = URI.create("http://cisco.com/example");
+  private final QName QNAME = new QName(namespace, "heartbeat");
+
+
+  @GET
+  @Path("/hello")
+  @Produces(MediaType.TEXT_PLAIN)
+  public String hello() {
+    return "Hello";
+  }
+
+  @GET
+  @Path("/announce")
+  @Produces(MediaType.TEXT_PLAIN)
+  public String announce() {
+    _logger.info("Announce request received");
+
+    BundleContext ctx = getBundleContext();
+    ServiceReference providerRef = ctx.getServiceReference(ExampleProvider.class);
+    if (providerRef == null) {
+      _logger.debug("Could not get provider reference");
+      return "Could not get provider reference";
+    }
+
+    ExampleProvider provider = (ExampleProvider) ctx.getService(providerRef);
+    if (provider == null) {
+      _logger.info("Could not get provider service");
+      return "Could not get provider service";
+    }
+
+    provider.announce(QNAME);
+    return "Announcement sent ";
+
+  }
+
+  @GET
+  @Path("/rpc")
+  @Produces(MediaType.TEXT_PLAIN)
+  public String invokeRpc() throws Exception {
+    _logger.info("Invoking RPC");
+
+    ExampleConsumer consumer = getConsumer();
+    RpcResult<CompositeNode> result = consumer.invokeRpc(QNAME, new CompositeNodeImpl());
+    _logger.info("Result [{}]", result.isSuccessful());
+
+    return stringify(result);
+  }
+
+  @GET
+  @Path("/rpc-success")
+  @Produces(MediaType.TEXT_PLAIN)
+  public String invokeRpcSuccess() throws Exception {
+    ExampleConsumer consumer = getConsumer();
+    RpcResult<CompositeNode> result = consumer.invokeRpc(QNAME, consumer.getValidCompositeNodeWithFourSimpleChildren()); //TODO: Change this
+    _logger.info("Result [{}]", result.isSuccessful());
+
+    return stringify(result);
+  }
+
+  @GET
+  @Path("/rpc-failure")
+  @Produces(MediaType.TEXT_PLAIN)
+  public String invokeRpcFailure() throws Exception {
+    ExampleConsumer consumer = getConsumer();
+    //RpcResult<CompositeNode> result = consumer.invokeRpc(QNAME, consumer.getInvalidCompositeNodeCompositeChild()); //TODO: Change this
+    RpcResult<CompositeNode> result = consumer.invokeRpc(QNAME, null); //TODO: Change this
+    _logger.info("Result [{}]", result.isSuccessful());
+
+    return stringify(result);
+  }
+
+  @GET
+  @Path("/routingtable")
+  @Produces(MediaType.TEXT_PLAIN)
+  public String invokeRoutingTable() {
+    _logger.info("Invoking adding an entry in routing table");
+
+    BundleContext ctx = getBundleContext();
+    ServiceReference routingTableServiceReference = ctx.getServiceReference(RoutingTable.class);
+    if (routingTableServiceReference == null) {
+      _logger.debug("Could not get routing table impl reference");
+      return "Could not get routingtable referen ";
+    }
+    RoutingTable routingTable = (RoutingTableImpl) ctx.getService(routingTableServiceReference);
+    if (routingTable == null) {
+      _logger.info("Could not get routing table service");
+      return "Could not get routing table service";
+    }
+
+
+    RoutingIdentifierImpl rii = new RoutingIdentifierImpl();
+    try {
+      routingTable.addGlobalRoute(rii.toString(), "172.27.12.1:5000");
+    } catch (RoutingTableException e) {
+      _logger.error("error in adding routing identifier" + e.getMessage());
+
+    } catch (SystemException e) {
+      _logger.error("error in adding routing identifier" + e.getMessage());
+    }
+
+    Set<String> routes = routingTable.getRoutes(rii.toString());
+
+    StringBuilder stringBuilder = new StringBuilder();
+    for (String route : routes) {
+      stringBuilder.append(route);
+    }
+
+    _logger.info("Result [{}] routes added for route" + rii + stringBuilder.toString());
+
+    return stringBuilder.toString();
+  }
+
+  @GET
+  @Path("/routingtabledelete")
+  @Produces(MediaType.TEXT_PLAIN)
+  public String invokeDeleteRoutingTable() {
+    _logger.info("Invoking adding an entry in routing table");
+
+    BundleContext ctx = getBundleContext();
+    ServiceReference routingTableServiceReference = ctx.getServiceReference(RoutingTable.class);
+    if (routingTableServiceReference == null) {
+      _logger.debug("Could not get routing table impl reference");
+      return "Could not get routingtable referen ";
+    }
+    RoutingTable routingTable = (RoutingTableImpl) ctx.getService(routingTableServiceReference);
+    if (routingTable == null) {
+      _logger.info("Could not get routing table service");
+      return "Could not get routing table service";
+    }
+
+
+    RoutingIdentifierImpl rii = new RoutingIdentifierImpl();
+    try {
+      routingTable.removeGlobalRoute(rii.toString());
+    } catch (RoutingTableException e) {
+      _logger.error("error in adding routing identifier" + e.getMessage());
+
+    } catch (SystemException e) {
+      _logger.error("error in adding routing identifier" + e.getMessage());
+    }
+
+    Set<String> routes = routingTable.getRoutes(rii.toString());
+
+    StringBuilder stringBuilder = new StringBuilder();
+    if (routes != null) {
+      for (String route : routes) {
+        stringBuilder.append(route);
+      }
+    } else {
+      stringBuilder.append(" successfully");
+    }
+
+    _logger.info("Result [{}] routes removed for route" + rii + stringBuilder.toString());
+
+    return stringBuilder.toString();
+  }
+
+  private String stringify(RpcResult<CompositeNode> result) {
+    CompositeNode node = result.getResult();
+    StringBuilder builder = new StringBuilder("result:").append(XmlUtils.compositeNodeToXml(node)).append("\n")
+        .append("error:").append(result.getErrors()).append("\n");
+
+    return builder.toString();
+  }
+
+  private BundleContext getBundleContext() {
+    ClassLoader tlcl = Thread.currentThread().getContextClassLoader();
+    Bundle bundle = null;
+
+    if (tlcl instanceof BundleReference) {
+      bundle = ((BundleReference) tlcl).getBundle();
+    } else {
+      _logger.info("Unable to determine the bundle context based on " +
+          "thread context classloader.");
+      bundle = FrameworkUtil.getBundle(this.getClass());
+    }
+    return (bundle == null ? null : bundle.getBundleContext());
+  }
+
+  private ExampleConsumer getConsumer() {
+    BundleContext ctx = getBundleContext();
+    ServiceReference consumerRef = ctx.getServiceReference(ExampleConsumer.class);
+    if (consumerRef == null) {
+      _logger.debug("Could not get consumer reference");
+      throw new NullPointerException("Could not get consumer reference");
+    }
+    ExampleConsumer consumer = (ExampleConsumer) ctx.getService(consumerRef);
+    if (consumer == null) {
+      _logger.info("Could not get consumer service");
+      throw new NullPointerException("Could not get consumer service");
+    }
+    return consumer;
+  }
+
+  class RoutingIdentifierImpl implements RpcRouter.RouteIdentifier, Serializable {
+
+    private final URI namespace = URI.create("http://cisco.com/example");
+    private final QName QNAME = new QName(namespace, "global");
+    private final QName instance = new QName(URI.create("127.0.0.1"), "local");
+
+    @Override
+    public QName getContext() {
+      return QNAME;
+    }
+
+    @Override
+    public QName getType() {
+      return QNAME;
+    }
+
+    @Override
+    public org.opendaylight.yangtools.yang.data.api.InstanceIdentifier getRoute() {
+      return InstanceIdentifier.of(instance);
+    }
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-nb/src/main/resources/WEB-INF/web.xml b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-nb/src/main/resources/WEB-INF/web.xml
new file mode 100644 (file)
index 0000000..5bd2139
--- /dev/null
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<web-app xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
+        version="3.0">
+  <servlet>
+    <servlet-name>JAXRSZmq</servlet-name>
+    <servlet-class>com.sun.jersey.spi.container.servlet.ServletContainer</servlet-class>
+    <init-param>
+      <param-name>javax.ws.rs.Application</param-name>
+      <param-value>org.opendaylight.controller.northbound.commons.NorthboundApplication</param-value>
+    </init-param>
+    <load-on-startup>1</load-on-startup>
+  </servlet>
+
+  <servlet-mapping>
+    <servlet-name>JAXRSZmq</servlet-name>
+    <url-pattern>/*</url-pattern>
+  </servlet-mapping>
+
+
+
+        <security-constraint>
+          <web-resource-collection>
+            <web-resource-name>NB api</web-resource-name>
+            <url-pattern>/*</url-pattern>
+            <http-method>POST</http-method>
+            <http-method>GET</http-method>
+            <http-method>PUT</http-method>
+            <http-method>PATCH</http-method>
+            <http-method>DELETE</http-method>
+            <http-method>HEAD</http-method>
+          </web-resource-collection>
+          <auth-constraint>
+            <role-name>System-Admin</role-name>
+            <role-name>Network-Admin</role-name>
+            <role-name>Network-Operator</role-name>
+            <role-name>Container-User</role-name>
+          </auth-constraint>
+        </security-constraint>
+
+        <security-role>
+                <role-name>System-Admin</role-name>
+        </security-role>
+        <security-role>
+                <role-name>Network-Admin</role-name>
+        </security-role>
+        <security-role>
+                <role-name>Network-Operator</role-name>
+        </security-role>
+        <security-role>
+                <role-name>Container-User</role-name>
+        </security-role>
+
+        <login-config>
+                <auth-method>BASIC</auth-method>
+                <realm-name>opendaylight</realm-name>
+        </login-config>
+</web-app>
diff --git a/opendaylight/md-sal/sal-zeromq-connector/pom.xml b/opendaylight/md-sal/sal-zeromq-connector/pom.xml
deleted file mode 100644 (file)
index 7859908..0000000
+++ /dev/null
@@ -1,149 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.opendaylight.controller</groupId>
-    <artifactId>sal-parent</artifactId>
-    <version>1.0-SNAPSHOT</version>
-  </parent>
-
-  <artifactId>sal-zeromq-connector</artifactId>
-  <packaging>bundle</packaging>
-
-  <properties>
-    <scala.version>2.10.3</scala.version>
-  </properties>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.felix</groupId>
-        <artifactId>maven-bundle-plugin</artifactId>
-        <extensions>true</extensions>
-        <configuration>
-          <instructions>
-            <Import-Package>
-              org.opendaylight.controller.sal.connector.api,
-              org.opendaylight.controller.sal.core.api,
-              org.opendaylight.yangtools.concepts;version="[0.1,1)",
-              org.opendaylight.yangtools.yang.common;version="[0.5,1)",
-              org.opendaylight.yangtools.yang.data.api;version="[0.5,1)",
-              org.zeromq;version="[0.3,1)"
-            </Import-Package>
-            <Bundle-Activator>org.opendaylight.controller.sal.connector.remoterpc.router.zeromq.Activator</Bundle-Activator>
-          </instructions>
-        </configuration>
-      </plugin>
-
-      <plugin>
-        <groupId>net.alchim31.maven</groupId>
-        <artifactId>scala-maven-plugin</artifactId>
-        <version>3.1.6</version>
-        <configuration>
-          <recompileMode>incremental</recompileMode>
-          <args>
-            <arg>-target:jvm-1.7</arg>
-          </args>
-          <javacArgs>
-            <javacArg>-source</javacArg><javacArg>1.7</javacArg>
-            <javacArg>-target</javacArg><javacArg>1.7</javacArg>
-          </javacArgs>
-        </configuration>
-        <executions>
-          <execution>
-            <id>scala-compile</id>
-            <goals>
-              <goal>compile</goal>
-            </goals>
-          </execution>
-          <execution>
-            <id>scala-test-compile</id>
-            <goals>
-              <goal>testCompile</goal>
-            </goals>
-          </execution>
-        </executions>
-
-      </plugin>
-      <plugin>
-        <artifactId>maven-compiler-plugin</artifactId>
-        <executions>
-          <execution>
-            <id>default-compile</id>
-            <phase>none</phase>
-          </execution>
-          <execution>
-            <id>default-testCompile</id>
-            <phase>none</phase>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.scala-lang</groupId>
-      <artifactId>scala-library</artifactId>
-      <version>${scala.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.opendaylight.controller</groupId>
-      <artifactId>containermanager</artifactId>
-      <version>0.5.1-SNAPSHOT</version>
-    </dependency>
-    <dependency>
-      <groupId>org.opendaylight.controller</groupId>
-      <artifactId>commons.northbound</artifactId>
-      <version>0.4.1-SNAPSHOT</version>
-    </dependency>
-    <dependency>
-      <groupId>org.opendaylight.controller</groupId>
-      <artifactId>sal</artifactId>
-      <version>0.5.1-SNAPSHOT</version>
-    </dependency>
-    <dependency>
-      <groupId>org.opendaylight.yangtools</groupId>
-      <artifactId>yang-binding</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.opendaylight.yangtools</groupId>
-      <artifactId>yang-common</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.opendaylight.controller</groupId>
-      <artifactId>sal-connector-api</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.opendaylight.controller</groupId>
-      <artifactId>sal-common-util</artifactId>
-      <version>1.0-SNAPSHOT</version>
-    </dependency>
-
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.jeromq</groupId>
-      <artifactId>jeromq</artifactId>
-      <version>0.3.0-SNAPSHOT</version>
-    </dependency>
-
-  </dependencies>
-  <repositories>
-    <repository>
-      <id>sonatype-nexus-snapshots</id>
-      <url>https://oss.sonatype.org/content/repositories/snapshots</url>
-      <releases>
-        <enabled>false</enabled>
-      </releases>
-      <snapshots>
-        <enabled>true</enabled>
-      </snapshots>
-    </repository>
-  </repositories>
-
-</project>
diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/api/RouteChange.java b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/api/RouteChange.java
deleted file mode 100644 (file)
index ba90f37..0000000
+++ /dev/null
@@ -1,17 +0,0 @@
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.connector.remoterpc.api;
-
-import java.util.Map;
-import java.util.Set;
-
-public interface RouteChange<I, R> {
-
-  Map<I, Set<R>> getRemovals();
-  Map<I, Set<R>> getAnnouncements();
-}
diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/Activator.java b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/Activator.java
deleted file mode 100644 (file)
index 5b927a5..0000000
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq;
-
-import org.opendaylight.controller.sal.core.api.AbstractProvider;
-import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
-import org.osgi.framework.BundleContext;
-
-public class Activator extends AbstractProvider {
-
-  ZeroMqRpcRouter router;
-
-  @Override
-  public void onSessionInitiated(ProviderSession session) {
-    router = ZeroMqRpcRouter.getInstance();
-    router.setBrokerSession(session);
-    router.start();
-  }
-
-  @Override
-  protected void stopImpl(BundleContext context) {
-    router.stop();
-  }
-
-}
diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/ZeroMqRpcRouter.java b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/ZeroMqRpcRouter.java
deleted file mode 100644 (file)
index af94804..0000000
+++ /dev/null
@@ -1,450 +0,0 @@
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq;
-
-import java.io.IOException;
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.opendaylight.controller.sal.connector.remoterpc.router.zeromq.Message.MessageType;
-import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
-import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
-import org.opendaylight.controller.sal.core.api.RpcImplementation;
-import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.zeromq.ZMQ;
-
-/**
- * ZeroMq based implementation of RpcRouter
- * TODO:
- *    1. Make it multi VM aware
- *    2. Make rpc request handling async and non-blocking. Note zmq socket is not thread safe
- *    3. sendRpc() should use connection pooling
- *    4. Read properties from config file using existing(?) ODL properties framework
- */
-public class ZeroMqRpcRouter implements RpcRouter<QName, QName, InstanceIdentifier, Object> {
-
-  private ExecutorService serverPool;
-  private static ExecutorService handlersPool;
-
-  private Map<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String> routingTable;
-
-  private ProviderSession brokerSession;
-
-  private ZMQ.Context context;
-  private ZMQ.Socket publisher;
-  private ZMQ.Socket subscriber;
-  private ZMQ.Socket replySocket;
-
-  private static ZeroMqRpcRouter _instance = new ZeroMqRpcRouter();
-
-  private final RpcFacade facade = new RpcFacade();
-  private final RpcListener listener = new RpcListener();
-
-  private final String localIp = getLocalIpAddress();
-
-  private String pubPort = System.getProperty("pub.port");// port on which announcements are sent
-  private String subPort = System.getProperty("sub.port");// other controller's pub port
-  private String pubIp = System.getProperty("pub.ip"); // other controller's ip
-  private String rpcPort = System.getProperty("rpc.port");// port on which RPC messages are received
-
-  private Logger _logger = LoggerFactory.getLogger(ZeroMqRpcRouter.class);
-
-  //Prevent instantiation
-  private ZeroMqRpcRouter() {
-  }
-
-  public static ZeroMqRpcRouter getInstance() {
-    return _instance;
-  }
-
-  public void start() {
-    context = ZMQ.context(2);
-    publisher = context.socket(ZMQ.PUB);
-    int ret = publisher.bind("tcp://*:" + pubPort);
-    // serverPool = Executors.newSingleThreadExecutor();
-    serverPool = Executors.newCachedThreadPool();
-    handlersPool = Executors.newCachedThreadPool();
-    routingTable = new ConcurrentHashMap<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String>();
-
-    // Start listening for announce and rpc messages
-    serverPool.execute(receive());
-
-    brokerSession.addRpcRegistrationListener(listener);
-
-    Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
-    for (QName rpc : currentlySupported) {
-      listener.onRpcImplementationAdded(rpc);
-    }
-
-  }
-
-  public void stop() {
-    if (handlersPool != null)
-      handlersPool.shutdown();
-    if (serverPool != null)
-      serverPool.shutdown();
-    if (publisher != null) {
-      publisher.setLinger(0);
-      publisher.close();
-    }
-    if (replySocket != null) {
-      replySocket.setLinger(0);
-      replySocket.close();
-    }
-    if (subscriber != null) {
-      subscriber.setLinger(0);
-      subscriber.close();
-    }
-    if (context != null)
-      context.term();
-
-  }
-
-  private Runnable receive() {
-    return new Runnable() {
-      public void run() {
-        try {
-          // Bind to RPC reply socket
-          replySocket = context.socket(ZMQ.REP);
-          replySocket.bind("tcp://*:" + rpcPort);
-
-          // Bind to publishing controller
-          subscriber = context.socket(ZMQ.SUB);
-          String pubAddress = "tcp://" + pubIp + ":" + subPort;
-          subscriber.connect(pubAddress);
-          _logger.debug("{} Subscribing at[{}]", Thread.currentThread().getName(), pubAddress);
-
-          //subscribe for announcements
-          //TODO: Message type would be changed. Update this
-          subscriber.subscribe(Message.serialize(Message.MessageType.ANNOUNCE));
-
-          // Poller enables listening on multiple sockets using a single thread
-          ZMQ.Poller poller = new ZMQ.Poller(2);
-          poller.register(replySocket, ZMQ.Poller.POLLIN);
-          poller.register(subscriber, ZMQ.Poller.POLLIN);
-
-          //TODO: Add code to restart the thread after exception
-          while (!Thread.currentThread().isInterrupted()) {
-
-            poller.poll();
-
-            if (poller.pollin(0)) {
-              handleRpcCall();
-            }
-            if (poller.pollin(1)) {
-              handleAnnouncement();
-            }
-          }
-        } catch (Exception e) {
-          e.printStackTrace();
-        }
-        replySocket.setLinger(0);
-        replySocket.close();
-        subscriber.setLinger(0);
-        subscriber.close();
-      }
-    };
-  }
-
-  /**
-   * @throws IOException
-   * @throws ClassNotFoundException
-   */
-  private void handleAnnouncement() throws IOException, ClassNotFoundException {
-
-    _logger.info("Announcement received");
-    Message.MessageType topic = (MessageType) Message.deserialize(subscriber.recv());
-
-    if (subscriber.hasReceiveMore()) {
-      try {
-        Message m = (Message) Message.deserialize(subscriber.recv());
-        _logger.debug("Announcement message [{}]", m);
-
-        // TODO: check on msg type or topic. Both
-        // should be same. Need to normalize.
-        if (Message.MessageType.ANNOUNCE == m.getType())
-          updateRoutingTable(m);
-      } catch (IOException | ClassNotFoundException e) {
-        e.printStackTrace();
-      }
-    }
-
-  }
-
-  /**
-   * @throws InterruptedException
-   * @throws ExecutionException
-   */
-  private void handleRpcCall() throws InterruptedException, ExecutionException {
-    try {
-      Message request = parseMessage(replySocket);
-
-      _logger.debug("Received rpc request [{}]", request);
-
-      // Call broker to process the message then reply
-      Future<RpcResult<CompositeNode>> rpc = brokerSession.rpc(
-          (QName) request.getRoute().getType(), (CompositeNode) request.getPayload());
-
-      RpcResult<CompositeNode> result = rpc.get();
-
-      Message response = new Message.MessageBuilder()
-          .type(MessageType.RESPONSE)
-          .sender(localIp + ":" + rpcPort)
-          .route(request.getRoute())
-          //.payload(result)    TODO: enable and test
-          .build();
-
-      replySocket.send(Message.serialize(response));
-
-      _logger.debug("Sent rpc response [{}]", response);
-
-    } catch (IOException ex) {
-      //TODO: handle exception and send error codes to caller
-      ex.printStackTrace();
-    }
-  }
-
-
-  @Override
-  public Future<RpcReply<Object>> sendRpc(
-      final RpcRequest<QName, QName, InstanceIdentifier, Object> input) {
-
-    return handlersPool.submit(new Callable<RpcReply<Object>>() {
-
-      @Override
-      public RpcReply<Object> call() {
-        ZMQ.Socket requestSocket = context.socket(ZMQ.REQ);
-
-        // TODO pick the ip and port from routing table based on routing identifier
-        requestSocket.connect("tcp://" + pubIp + ":5554");
-
-        Message requestMessage = new Message.MessageBuilder()
-            .type(MessageType.REQUEST)
-            .sender(localIp + ":" + rpcPort)
-            .route(input.getRoutingInformation())
-            .payload(input.getPayload())
-            .build();
-
-        _logger.debug("Sending rpc request [{}]", requestMessage);
-
-        RpcReply<Object> reply = null;
-
-        try {
-
-          requestSocket.send(Message.serialize(requestMessage));
-          final Message response = parseMessage(requestSocket);
-
-          _logger.debug("Received response [{}]", response);
-
-          reply = new RpcReply<Object>() {
-
-            @Override
-            public Object getPayload() {
-              return response.getPayload();
-            }
-          };
-        } catch (IOException ex) {
-          // TODO: Pass exception back to the caller
-          ex.printStackTrace();
-        }
-
-        return reply;
-      }
-    });
-  }
-
-  /**
-   * TODO: Remove this implementation and use RoutingTable implementation to send announcements
-   * Publishes a notice to other controllers in the cluster
-   *
-   * @param notice
-   */
-  public void publish(final Message notice) {
-    Runnable task = new Runnable() {
-      public void run() {
-
-        try {
-
-          publisher.sendMore(Message.serialize(Message.MessageType.ANNOUNCE));
-          publisher.send(Message.serialize(notice));
-          _logger.debug("Announcement sent [{}]", notice);
-        } catch (IOException ex) {
-          _logger.error("Error in sending announcement [{}]", notice);
-          ex.printStackTrace();
-        }
-      }
-    };
-    handlersPool.execute(task);
-  }
-
-  /**
-   * Finds IPv4 address of the local VM
-   * TODO: This method is non-deterministic. There may be more than one IPv4 address. Cant say which
-   * address will be returned. Read IP from a property file or enhance the code to make it deterministic.
-   * Should we use IP or hostname?
-   *
-   * @return
-   */
-  private String getLocalIpAddress() {
-    String hostAddress = null;
-    Enumeration e = null;
-    try {
-      e = NetworkInterface.getNetworkInterfaces();
-    } catch (SocketException e1) {
-      e1.printStackTrace();
-    }
-    while (e.hasMoreElements()) {
-
-      NetworkInterface n = (NetworkInterface) e.nextElement();
-
-      Enumeration ee = n.getInetAddresses();
-      while (ee.hasMoreElements()) {
-        InetAddress i = (InetAddress) ee.nextElement();
-        if ((i instanceof Inet4Address) && (i.isSiteLocalAddress()))
-          hostAddress = i.getHostAddress();
-      }
-    }
-    return hostAddress;
-
-  }
-
-  /**
-   * TODO: Change to use external routing table implementation
-   *
-   * @param msg
-   */
-  private void updateRoutingTable(Message msg) {
-    routingTable.put(msg.getRoute(), msg.getSender());
-    RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> route = msg.getRoute();
-
-    // Currently only registers rpc implementation.
-    // TODO: do registration for instance based routing
-    QName rpcType = route.getType();
-    RpcRegistration registration = brokerSession.addRpcImplementation(rpcType, facade);
-    _logger.debug("Routing table updated");
-  }
-
-  /**
-   * @param socket
-   * @return
-   */
-  private Message parseMessage(ZMQ.Socket socket) {
-
-    Message msg = null;
-    try {
-      byte[] bytes = socket.recv();
-      _logger.debug("Received bytes:[{}]", bytes.length);
-      msg = (Message) Message.deserialize(bytes);
-    } catch (Throwable t) {
-      t.printStackTrace();
-    }
-    return msg;
-  }
-
-  private class RpcFacade implements RpcImplementation {
-
-    @Override
-    public Set<QName> getSupportedRpcs() {
-      return Collections.emptySet();
-    }
-
-    @Override
-    public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
-
-      RouteIdentifierImpl routeId = new RouteIdentifierImpl();
-      routeId.setType(rpc);
-
-      RpcRequestImpl request = new RpcRequestImpl();
-      request.setRouteIdentifier(routeId);
-      request.setPayload(input);
-
-      final Future<RpcReply<Object>> ret = sendRpc(request);
-
-      //TODO: Review result handling
-      RpcResult<CompositeNode> result = new RpcResult<CompositeNode>() {
-        @Override
-        public boolean isSuccessful() {
-          try {
-            ret.get();
-          } catch (InterruptedException | ExecutionException e) {
-            e.printStackTrace();
-            return false;
-          }
-          return true;
-        }
-
-        @Override
-        public CompositeNode getResult() {
-          return null;
-        }
-
-        @Override
-        public Collection<RpcError> getErrors() {
-          return Collections.EMPTY_LIST;
-        }
-      };
-      return result;
-    }
-  }
-
-  /**
-   * Listener for rpc registrations
-   */
-  private class RpcListener implements RpcRegistrationListener {
-
-    @Override
-    public void onRpcImplementationAdded(QName name) {
-
-      _logger.debug("Announcing registration for [{}]", name);
-      RouteIdentifierImpl routeId = new RouteIdentifierImpl();
-      routeId.setType(name);
-
-      //TODO: Make notice immutable and change message type
-      Message notice = new Message.MessageBuilder()
-          .type(MessageType.ANNOUNCE)
-          .sender("tcp://" + localIp + ":" + rpcPort)
-          .route(routeId)
-          .build();
-
-      publish(notice);
-    }
-
-    @Override
-    public void onRpcImplementationRemoved(QName name) {
-      // TODO: send a rpc-deregistrtation notice
-
-    }
-  }
-
-  public void setBrokerSession(ProviderSession session) {
-    this.brokerSession = session;
-
-  }
-
-}
diff --git a/opendaylight/md-sal/test/pom.xml b/opendaylight/md-sal/test/pom.xml
deleted file mode 100644 (file)
index f9e500e..0000000
+++ /dev/null
@@ -1,24 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-       <modelVersion>4.0.0</modelVersion>
-       <parent>
-               <artifactId>sal-parent</artifactId>
-               <version>1.0-SNAPSHOT</version>
-               <groupId>org.opendaylight.controller</groupId>
-       </parent>
-       <packaging>pom</packaging>
-       <groupId>org.opendaylight.controller.tests</groupId>
-       <artifactId>sal-test-parent</artifactId>
-    <scm>
-      <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
-      <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
-      <url>https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL</url>
-    </scm>
-
-       <modules>
-               <module>zeromq-test-consumer</module>
-        <module>zeromq-test-it</module>
-        <module>zeromq-test-provider</module>
-       </modules>
-       
-</project>
diff --git a/opendaylight/md-sal/test/zeromq-test-consumer/src/main/java/org/opendaylight/controller/sample/zeromq/consumer/ExampleConsumer.java b/opendaylight/md-sal/test/zeromq-test-consumer/src/main/java/org/opendaylight/controller/sample/zeromq/consumer/ExampleConsumer.java
deleted file mode 100644 (file)
index a56a7de..0000000
+++ /dev/null
@@ -1,51 +0,0 @@
-package org.opendaylight.controller.sample.zeromq.consumer;
-
-import java.net.URI;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.opendaylight.controller.sal.core.api.AbstractConsumer;
-import org.opendaylight.controller.sal.core.api.Broker.ConsumerSession;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.osgi.framework.BundleContext;
-
-public class ExampleConsumer extends AbstractConsumer {
-
-    private final URI namespace = URI.create("http://cisco.com/example");
-    private final QName QNAME = new QName(namespace,"heartbeat");
-    
-    ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
-    private ConsumerSession session;
-    
-    
-    @Override
-    public void onSessionInitiated(ConsumerSession session) {
-        this.session = session;
-        executor.scheduleAtFixedRate(new Runnable() {
-            
-            @Override
-            public void run() {
-                int count = 0;
-                try {
-                    Future<RpcResult<CompositeNode>> future = ExampleConsumer.this.session.rpc(QNAME, null);
-                    RpcResult<CompositeNode> result = future.get();
-                    System.out.println("Result received. Status is :" + result.isSuccessful());
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-                
-            }
-        }, 0, 10, TimeUnit.SECONDS);
-    }
-    
-    @Override
-    protected void stopImpl(BundleContext context) {
-        // TODO Auto-generated method stub
-        super.stopImpl(context);
-        executor.shutdown();
-    }
-}
diff --git a/opendaylight/md-sal/test/zeromq-test-it/pom.xml b/opendaylight/md-sal/test/zeromq-test-it/pom.xml
deleted file mode 100644 (file)
index 56945d1..0000000
+++ /dev/null
@@ -1,184 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-    <parent>
-        <artifactId>sal-test-parent</artifactId>
-        <groupId>org.opendaylight.controller.tests</groupId>
-        <version>1.0-SNAPSHOT</version>
-    </parent>
-    <artifactId>zeromq-test-it</artifactId>
-    <scm>
-        <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
-        <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
-        <url>https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL</url>
-    </scm>
-
-    <properties>
-        <exam.version>3.0.0</exam.version>
-        <url.version>1.5.0</url.version>
-    </properties>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.ops4j.pax.exam</groupId>
-                <artifactId>maven-paxexam-plugin</artifactId>
-                <version>1.2.4</version>
-                <executions>
-                    <execution>
-                        <id>generate-config</id>
-                        <goals>
-                            <goal>generate-depends-file</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-        <pluginManagement>
-            <plugins>
-                <!--This plugin's configuration is used to store Eclipse m2e settings 
-                    only. It has no influence on the Maven build itself. -->
-                <plugin>
-                    <groupId>org.eclipse.m2e</groupId>
-                    <artifactId>lifecycle-mapping</artifactId>
-                    <version>1.0.0</version>
-                    <configuration>
-                        <lifecycleMappingMetadata>
-                            <pluginExecutions>
-                                <pluginExecution>
-                                    <pluginExecutionFilter>
-                                        <groupId>
-                                            org.ops4j.pax.exam
-                                        </groupId>
-                                        <artifactId>
-                                            maven-paxexam-plugin
-                                        </artifactId>
-                                        <versionRange>
-                                            [1.2.4,)
-                                        </versionRange>
-                                        <goals>
-                                            <goal>
-                                                generate-depends-file
-                                            </goal>
-                                        </goals>
-                                    </pluginExecutionFilter>
-                                    <action>
-                                        <ignore></ignore>
-                                    </action>
-                                </pluginExecution>
-                            </pluginExecutions>
-                        </lifecycleMappingMetadata>
-                    </configuration>
-                </plugin>
-            </plugins>
-        </pluginManagement>
-    </build>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.opendaylight.yangtools.thirdparty</groupId>
-            <artifactId>xtend-lib-osgi</artifactId>
-            <version>2.4.3</version>
-        </dependency>
-        <dependency>
-            <groupId>org.opendaylight.controller.tests</groupId>
-            <artifactId>zeromq-test-provider</artifactId>
-            <version>1.0-SNAPSHOT</version>
-        </dependency>
-        <dependency>
-            <groupId>org.opendaylight.controller.tests</groupId>
-            <artifactId>zeromq-test-consumer</artifactId>
-            <version>1.0-SNAPSHOT</version>
-        </dependency>
-        <dependency>
-            <groupId>org.opendaylight.controller</groupId>
-            <artifactId>sal-broker-impl</artifactId>
-            <version>1.0-SNAPSHOT</version>
-        </dependency>
-        <dependency>
-            <groupId>org.ops4j.pax.exam</groupId>
-            <artifactId>pax-exam-container-native</artifactId>
-            <version>${exam.version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.ops4j.pax.exam</groupId>
-            <artifactId>pax-exam-junit4</artifactId>
-            <version>${exam.version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.ops4j.pax.exam</groupId>
-            <artifactId>pax-exam-link-mvn</artifactId>
-            <version>${exam.version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>equinoxSDK381</groupId>
-            <artifactId>org.eclipse.osgi</artifactId>
-            <version>3.8.1.v20120830-144521</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>log4j-over-slf4j</artifactId>
-            <version>1.7.2</version>
-        </dependency>
-        <dependency>
-            <groupId>ch.qos.logback</groupId>
-            <artifactId>logback-core</artifactId>
-            <version>1.0.9</version>
-        </dependency>
-        <dependency>
-            <groupId>ch.qos.logback</groupId>
-            <artifactId>logback-classic</artifactId>
-            <version>1.0.9</version>
-        </dependency>
-      <dependency>
-        <groupId>org.opendaylight.controller</groupId>
-        <artifactId>sal-binding-api</artifactId>
-        <version>1.0-SNAPSHOT</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.controller</groupId>
-        <artifactId>sal-common-util</artifactId>
-        <version>1.0-SNAPSHOT</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.controller</groupId>
-        <artifactId>sal-core-api</artifactId>
-        <version>1.0-SNAPSHOT</version>
-      </dependency>
-
-
-      <dependency>
-        <groupId>org.opendaylight.controller</groupId>
-        <artifactId>containermanager</artifactId>
-        <version>0.5.1-SNAPSHOT</version>
-      </dependency>
-
-      <dependency>
-        <groupId>org.opendaylight.controller</groupId>
-        <artifactId>sal</artifactId>
-        <version>0.5.1-SNAPSHOT</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.yangtools</groupId>
-        <artifactId>yang-binding</artifactId>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.yangtools</groupId>
-        <artifactId>yang-common</artifactId>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.yangtools</groupId>
-        <artifactId>yang-data-api</artifactId>
-      </dependency>
-
-      <dependency>
-        <groupId>org.opendaylight.controller</groupId>
-        <artifactId>sal-common-util</artifactId>
-        <version>1.0-SNAPSHOT</version>
-      </dependency>
-    </dependencies>
-</project>
diff --git a/opendaylight/md-sal/test/zeromq-test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/ServiceConsumerController.java b/opendaylight/md-sal/test/zeromq-test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/ServiceConsumerController.java
deleted file mode 100644 (file)
index c17b143..0000000
+++ /dev/null
@@ -1,75 +0,0 @@
-package org.opendaylight.controller.sample.zeromq.test.it;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.ops4j.pax.exam.Configuration;
-import org.ops4j.pax.exam.Option;
-import org.ops4j.pax.exam.junit.PaxExam;
-import org.osgi.framework.BundleContext;
-
-import javax.inject.Inject;
-
-import static org.junit.Assert.assertTrue;
-import static org.ops4j.pax.exam.CoreOptions.*;
-
-@RunWith(PaxExam.class)
-public class ServiceConsumerController {
-
-    public static final String ODL = "org.opendaylight.controller";
-    public static final String YANG = "org.opendaylight.yangtools";
-    public static final String SAMPLE = "org.opendaylight.controller.samples";
-
-    @Test
-    public void properInitialized() throws Exception {
-
-        Thread.sleep(30000); // Waiting for services to get wired.
-        assertTrue(true);
-        //assertTrue(consumer.createToast(WhiteBread.class, 5));
-
-    }
-
-//    @Inject
-//    BindingAwareBroker broker;
-
-//    @Inject
-//    ToastConsumer consumer;
-
-    @Inject
-    BundleContext ctx;
-
-    @Configuration
-    public Option[] config() {
-        return options(systemProperty("osgi.console").value("2401"),
-                systemProperty("pub.port").value("5557"),
-                systemProperty("sub.port").value("5556"),
-                systemProperty("rpc.port").value("5555"),
-                systemProperty("pub.ip").value("localhost"),
-                mavenBundle("org.slf4j", "slf4j-api").versionAsInProject(), //
-                mavenBundle("org.slf4j", "log4j-over-slf4j").versionAsInProject(), //
-                mavenBundle("ch.qos.logback", "logback-core").versionAsInProject(), //
-                mavenBundle("ch.qos.logback", "logback-classic").versionAsInProject(), //
-               
-                //mavenBundle(ODL, "sal-binding-broker-impl").versionAsInProject().update(), //
-                mavenBundle(ODL, "sal-common").versionAsInProject(), //
-                mavenBundle(ODL, "sal-common-api").versionAsInProject(),//
-                mavenBundle(ODL, "sal-common-impl").versionAsInProject(), //
-                mavenBundle(ODL, "sal-common-util").versionAsInProject(), //
-                mavenBundle(ODL, "sal-core-api").versionAsInProject().update(), //
-                mavenBundle(ODL, "sal-broker-impl").versionAsInProject(), //
-                mavenBundle(ODL, "sal-core-spi").versionAsInProject().update(), //
-                mavenBundle(ODL, "sal-connector-api").versionAsInProject(), //
-                mavenBundle(SAMPLE, "zeromq-test-consumer").versionAsInProject(), //
-                mavenBundle(ODL, "sal-zeromq-connector").versionAsInProject(), //
-                mavenBundle(YANG, "concepts").versionAsInProject(),
-                mavenBundle(YANG, "yang-binding").versionAsInProject(), //
-                mavenBundle(YANG, "yang-common").versionAsInProject(), //
-                mavenBundle(YANG, "yang-data-api").versionAsInProject(), //
-                mavenBundle(YANG, "yang-model-api").versionAsInProject(), //
-                mavenBundle(YANG+".thirdparty", "xtend-lib-osgi").versionAsInProject(), //
-                mavenBundle("com.google.guava", "guava").versionAsInProject(), //
-                mavenBundle("org.jeromq", "jeromq").versionAsInProject(),
-                junitBundles()
-                );
-    }
-
-}
diff --git a/opendaylight/md-sal/test/zeromq-test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/ServiceProviderController.java b/opendaylight/md-sal/test/zeromq-test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/ServiceProviderController.java
deleted file mode 100644 (file)
index 2d28b0b..0000000
+++ /dev/null
@@ -1,86 +0,0 @@
-package org.opendaylight.controller.sample.zeromq.test.it;
-
-import static org.junit.Assert.*;
-import static org.ops4j.pax.exam.CoreOptions.junitBundles;
-import static org.ops4j.pax.exam.CoreOptions.mavenBundle;
-import static org.ops4j.pax.exam.CoreOptions.options;
-import static org.ops4j.pax.exam.CoreOptions.systemPackages;
-import static org.ops4j.pax.exam.CoreOptions.systemProperty;
-import static org.ops4j.pax.exam.CoreOptions.maven;
-
-import java.util.Collection;
-
-import javax.inject.Inject;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
-import org.ops4j.pax.exam.Configuration;
-import org.ops4j.pax.exam.CoreOptions;
-import org.ops4j.pax.exam.Option;
-import org.ops4j.pax.exam.junit.PaxExam;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.InvalidSyntaxException;
-import org.osgi.framework.ServiceReference;
-
-@RunWith(PaxExam.class)
-public class ServiceProviderController {
-
-    public static final String ODL = "org.opendaylight.controller";
-    public static final String YANG = "org.opendaylight.yangtools";
-    public static final String SAMPLE = "org.opendaylight.controller.samples";
-
-    @Test
-    public void properInitialized() throws Exception {
-
-        Thread.sleep(30000); // Waiting for services to get wired.
-        assertTrue(true);
-        //assertTrue(consumer.createToast(WhiteBread.class, 5));
-
-    }
-
-//    @Inject
-//    BindingAwareBroker broker;
-
-//    @Inject
-//    ToastConsumer consumer;
-
-    @Inject
-    BundleContext ctx;
-
-    @Configuration
-    public Option[] config() {
-        return options(systemProperty("osgi.console").value("2401"),
-                systemProperty("pub.port").value("5556"),
-                systemProperty("sub.port").value("5557"),
-                systemProperty("rpc.port").value("5554"),
-                systemProperty("pub.ip").value("localhost"),
-                mavenBundle("org.slf4j", "slf4j-api").versionAsInProject(), //
-                mavenBundle("org.slf4j", "log4j-over-slf4j").versionAsInProject(), //
-                mavenBundle("ch.qos.logback", "logback-core").versionAsInProject(), //
-                mavenBundle("ch.qos.logback", "logback-classic").versionAsInProject(), //
-               
-                //mavenBundle(ODL, "sal-binding-broker-impl").versionAsInProject().update(), //
-                mavenBundle(ODL, "sal-common").versionAsInProject(), //
-                mavenBundle(ODL, "sal-common-api").versionAsInProject(),//
-                mavenBundle(ODL, "sal-common-impl").versionAsInProject(), //
-                mavenBundle(ODL, "sal-common-util").versionAsInProject(), //
-                mavenBundle(ODL, "sal-core-api").versionAsInProject().update(), //
-                mavenBundle(ODL, "sal-broker-impl").versionAsInProject(), //
-                mavenBundle(ODL, "sal-core-spi").versionAsInProject().update(), //
-                mavenBundle(ODL, "sal-connector-api").versionAsInProject(), //
-                mavenBundle(SAMPLE, "zeromq-test-provider").versionAsInProject(), //
-                mavenBundle(ODL, "sal-zeromq-connector").versionAsInProject(), //
-                mavenBundle(YANG, "concepts").versionAsInProject(),
-                mavenBundle(YANG, "yang-binding").versionAsInProject(), //
-                mavenBundle(YANG, "yang-common").versionAsInProject(), //
-                mavenBundle(YANG, "yang-data-api").versionAsInProject(), //
-                mavenBundle(YANG, "yang-model-api").versionAsInProject(), //
-                mavenBundle(YANG+".thirdparty", "xtend-lib-osgi").versionAsInProject(), //
-                mavenBundle("com.google.guava", "guava").versionAsInProject(), //
-                mavenBundle("org.jeromq", "jeromq").versionAsInProject(),
-                junitBundles()
-                );
-    }
-
-}
diff --git a/opendaylight/md-sal/test/zeromq-test-provider/src/main/java/org/opendaylight/controller/sample/zeromq/provider/ExampleProvider.java b/opendaylight/md-sal/test/zeromq-test-provider/src/main/java/org/opendaylight/controller/sample/zeromq/provider/ExampleProvider.java
deleted file mode 100644 (file)
index ec7d7a8..0000000
+++ /dev/null
@@ -1,67 +0,0 @@
-package org.opendaylight.controller.sample.zeromq.provider;
-
-import java.net.URI;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Set;
-
-import org.opendaylight.controller.sal.common.util.Rpcs;
-import org.opendaylight.controller.sal.core.api.AbstractProvider;
-import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
-import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
-import org.opendaylight.controller.sal.core.api.RpcImplementation;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.osgi.framework.BundleContext;
-
-public class ExampleProvider extends AbstractProvider implements RpcImplementation  {
-
-    private final URI namespace = URI.create("http://cisco.com/example");
-    private final QName QNAME = new QName(namespace,"heartbeat");
-    private RpcRegistration reg;
-    
-    
-    @Override
-    public void onSessionInitiated(ProviderSession session) {
-      //Adding heartbeat 10 times just to make sure subscriber get it
-      for (int i=0;i<10;i++){
-        System.out.println("ExampleProvider: Adding " + QNAME + " " + i);
-        reg = session.addRpcImplementation(QNAME, this);
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException e) {
-          e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-        }
-      }
-    }
-    
-    @Override
-    public Set<QName> getSupportedRpcs() {
-        return Collections.singleton(QNAME);
-    }
-    
-    @Override
-    public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
-        if(QNAME.equals(rpc)) {
-            RpcResult<CompositeNode> output = Rpcs.getRpcResult(true, null, Collections.<RpcError>emptySet());
-            return output;
-        }
-        RpcResult<CompositeNode> output = Rpcs.getRpcResult(false, null, Collections.<RpcError>emptySet());
-        return output;
-    }
-    
-    @Override
-    protected void stopImpl(BundleContext context) {
-     if(reg != null) {
-         try {
-            reg.close();
-        } catch (Exception e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-        }
-     }
-    }
-
-}
index 37c973e..2926786 100644 (file)
@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.opendaylight.controller</groupId>
                 <extensions>true</extensions>
                 <configuration>
                     <instructions>
-
+                        <Export-Package>
+                            org.opendaylight.controller.sal.connector.remoterpc.api,
+                            org.opendaylight.controller.sal.connector.remoterpc.impl
+                        </Export-Package>
                         <Import-Package>
                             javax.xml.bind.annotation,
                             org.opendaylight.controller.sal.core,
         <dependency>
             <groupId>org.opendaylight.controller</groupId>
             <artifactId>sal</artifactId>
-            <version>0.5.1-SNAPSHOT</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.osgi</groupId>
+                    <artifactId>org.osgi.compendium</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.opendaylight.controller</groupId>