- This provides implementation for enabling remote rpc calls between 2 instances of md-sal.
The current implementation enables remote execution of globally unique services in the
cluster. For details, please refer to this wiki page
(https://wiki.opendaylight.org/view/Zeromq_connector). This wiki page is a draft.
- Added relativePath in pom so that parent pom can be found.
- Removed dependency to sal-infinispan-routingtable
- Exported "impl" as well from zeromq-routingtable. Fixed dependencies in RouterTest.
- Removed oss.sonatype release repo from md-sal pom. ODL nexus repo mirrors it.
- Updated server code to handle exception
- Server code now uses WB pattern instead of listerner pattern.
- Fixed pom so that parent can be resolved
- Rebased due to changed in unmerged dependency
- Added state machine to RpcSocket.
- Added unit tests to RpcSocketTest and SocketManagerTest.
- Added CompositeNode methods to ExampleConsumer & XML files for creation of CompositeNodes
- Added CompositeNode testcases to RouterTest
- Translated scala code to java
- Added code to convert CompositeNode to xml and back to help
- with serialization.
- Added more unit and integration tests.
This is squash for:
https://git.opendaylight.org/gerrit/2882
https://git.opendaylight.org/gerrit/3022
https://git.opendaylight.org/gerrit/3028
https://git.opendaylight.org/gerrit/3159
Change-Id: I44739fd8ad61043c2e786875bb7787e3fa68e435
Signed-off-by: Abhishek Kumar <abhishk2@cisco.com>
Signed-off-by: Tony Tkacik <ttkacik@cisco.com>
Signed-off-by: Alex Fan <railor33@gmail.com>
<groupId>org.opendaylight.controller.thirdparty</groupId>
<artifactId>ganymed</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-remoterpc-connector</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>
+ zeromq-routingtable.implementation
+ </artifactId>
+ <version>0.4.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.zeromq</groupId>
+ <artifactId>jeromq</artifactId>
+ <version>0.3.1</version>
+ </dependency>
</dependencies>
</profile>
<profile>
</scm>
<modules>
- <!-- Common APIs & Implementation -->
+ <!-- Common APIs & Implementation -->
<module>sal-common</module>
<module>sal-common-api</module>
<module>sal-common-impl</module>
<module>sal-connector-api</module>
<module>sal-rest-connector</module>
<module>sal-netconf-connector</module>
-
+
+ <module>zeromq-routingtable/implementation</module>
+ <module>sal-remoterpc-connector/implementation</module>
<!-- Clustered Data Store -->
<module>clustered-data-store/implementation</module>
<module>inventory-manager</module>
<module>statistics-manager</module>
<module>forwardingrules-manager</module>
-
+
<!-- Compability Packages -->
<module>compatibility</module>
- <module>zeromq-routingtable/implementation</module>
- <module>sal-zeromq-connector</module>
</modules>
<profiles>
<profile>
- <id>integrationtests</id>
- <activation>
- <activeByDefault>false</activeByDefault>
- </activation>
+ <id>integrationtests</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
<modules>
<module>sal-binding-it</module>
- <module>zeromq-routingtable/integrationtest</module>
<module>clustered-data-store/integrationtest</module>
- <module>test</module>
+ <!--module>zeromq-routingtable/integrationtest</module -->
+ <!--module>sal-remoterpc-connector/integrationtest</module -->
+ <!--module>test/sal-rest-connector-it</modulei -->
</modules>
</profile>
<profile>
- <id>IDE</id>
- <activation>
- <property>
- <name>m2e.version</name>
- </property>
- </activation>
- <build>
- <!-- Put the IDE's build output in a folder other than target, so that IDE builds don't interact with Maven builds -->
- <directory>target-ide</directory>
- </build>
+ <id>IDE</id>
+ <activation>
+ <property>
+ <name>m2e.version</name>
+ </property>
+ </activation>
+ <build>
+ <!-- Put the IDE's build output in a folder other than target,
+ so that IDE builds don't interact with Maven builds -->
+ <directory>target-ide</directory>
+ </build>
</profile>
</profiles>
<guava.version>14.0.1</guava.version>
<osgi.core.version>5.0.0</osgi.core.version>
<junit.version>4.8.1</junit.version>
+ <powermock.version>1.5.1</powermock.version>
+ <mockito.version>1.9.5</mockito.version>
<xtend.version>2.4.3</xtend.version>
<maven.clean.plugin.version>2.5</maven.clean.plugin.version>
<jacoco.version>0.5.3.201107060350</jacoco.version>
+ <sal.version>0.5.1-SNAPSHOT</sal.version> <!-- AD Sal version -->
+
<!-- Sonar properties using jacoco to retrieve integration test results -->
<sonar.java.coveragePlugin>jacoco</sonar.java.coveragePlugin>
<sonar.dynamicAnalysis>reuseReports</sonar.dynamicAnalysis>
<pluginRepositories>
<!-- OpenDayLight Repo Mirror -->
<pluginRepository>
- <id>opendaylight-mirror</id>
- <name>opendaylight-mirror</name>
- <url>${nexusproxy}/groups/public/</url>
- <snapshots>
- <enabled>false</enabled>
- </snapshots>
- <releases>
- <enabled>true</enabled>
- <updatePolicy>never</updatePolicy>
- </releases>
+ <id>opendaylight-mirror</id>
+ <name>opendaylight-mirror</name>
+ <url>${nexusproxy}/groups/public/</url>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ <releases>
+ <enabled>true</enabled>
+ <updatePolicy>never</updatePolicy>
+ </releases>
</pluginRepository>
<!-- OpenDayLight Snapshot artifact -->
<pluginRepository>
- <id>opendaylight-snapshot</id>
- <name>opendaylight-snapshot</name>
- <url> ${nexusproxy}/repositories/opendaylight.snapshot/</url>
- <snapshots>
- <enabled>true</enabled>
- </snapshots>
- <releases>
- <enabled>false</enabled>
- </releases>
+ <id>opendaylight-snapshot</id>
+ <name>opendaylight-snapshot</name>
+ <url> ${nexusproxy}/repositories/opendaylight.snapshot/</url>
+ <snapshots>
+ <enabled>true</enabled>
+ </snapshots>
+ <releases>
+ <enabled>false</enabled>
+ </releases>
</pluginRepository>
</pluginRepositories>
<repositories>
<!-- OpenDayLight Repo Mirror -->
<repository>
- <id>opendaylight-mirror</id>
- <name>opendaylight-mirror</name>
- <url>${nexusproxy}/groups/public/</url>
- <snapshots>
- <enabled>false</enabled>
- </snapshots>
- <releases>
- <enabled>true</enabled>
- <updatePolicy>never</updatePolicy>
- </releases>
+ <id>opendaylight-mirror</id>
+ <name>opendaylight-mirror</name>
+ <url>${nexusproxy}/groups/public/</url>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ <releases>
+ <enabled>true</enabled>
+ <updatePolicy>never</updatePolicy>
+ </releases>
</repository>
<!-- OpenDayLight Snapshot artifact -->
<repository>
- <id>opendaylight-snapshot</id>
- <name>opendaylight-snapshot</name>
- <url> ${nexusproxy}/repositories/opendaylight.snapshot/</url>
- <snapshots>
- <enabled>true</enabled>
- </snapshots>
- <releases>
- <enabled>false</enabled>
- </releases>
+ <id>opendaylight-snapshot</id>
+ <name>opendaylight-snapshot</name>
+ <url> ${nexusproxy}/repositories/opendaylight.snapshot/</url>
+ <snapshots>
+ <enabled>true</enabled>
+ </snapshots>
+ <releases>
+ <enabled>false</enabled>
+ </releases>
</repository>
</repositories>
<artifactId>yang-data-api</artifactId>
<version>${yang.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-data-impl</artifactId>
+ <version>${yang.version}</version>
+ </dependency>
<dependency>
<groupId>org.opendaylight.yangtools</groupId>
<artifactId>yang-model-api</artifactId>
<artifactId>sal-connector-api</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal</artifactId>
+ <version>${sal.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.compendium</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<!-- Supporting Libraries -->
<dependency>
<artifactId>org.eclipse.xtend.lib</artifactId>
<version>${xtend.version}</version>
</dependency>
-
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.core</artifactId>
+ <version>${osgi.core.version}</version>
+ </dependency>
<!-- Testing Dependencies -->
<dependency>
<groupId>junit</groupId>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
- <version>1.9.5</version>
+ <version>${mockito.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-core</artifactId>
+ <version>${powermock.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<artifactId>maven-bundle-plugin</artifactId>
<version>${bundle.plugin.version}</version>
<extensions>true</extensions>
- <!--executions>
- <execution>
- <id>bundle-manifest</id>
- <phase>process-classes</phase>
- <goals>
- <goal>manifest</goal>
- </goals>
- </execution>
- </executions-->
+ <!--executions> <execution> <id>bundle-manifest</id>
+ <phase>process-classes</phase> <goals> <goal>manifest</goal> </goals> </execution>
+ </executions -->
<configuration>
<instructions>
<Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>
<artifactId>jacoco-maven-plugin</artifactId>
<version>${jacoco.version}</version>
</plugin>
- <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+ <!--This plugin's configuration is used to store Eclipse
+ m2e settings only. It has no influence on the Maven build itself. -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
</goals>
</pluginExecutionFilter>
<action>
- <ignore/>
+ <ignore />
</action>
</pluginExecution>
<pluginExecution>
</goals>
</pluginExecutionFilter>
<action>
- <ignore/>
+ <ignore />
</action>
</pluginExecution>
<pluginExecution>
</goals>
</pluginExecutionFilter>
<action>
- <ignore/>
+ <ignore />
</action>
</pluginExecution>
</pluginExecutions>
*/
package org.opendaylight.controller.sal.common.util;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
return ret;
}
- private static class RpcResultTO<T> implements RpcResult<T> {
+ private static class RpcResultTO<T> implements RpcResult<T>, Serializable {
private final Collection<RpcError> errors;
private final T result;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceReference;
+import org.osgi.util.tracker.ServiceTracker;
+import org.osgi.util.tracker.ServiceTrackerCustomizer;
-public abstract class AbstractConsumer implements Consumer, BundleActivator {
+public abstract class AbstractConsumer implements Consumer, BundleActivator,ServiceTrackerCustomizer<Broker, Broker> {
+
+
+
+
+ private BundleContext context;
+ private ServiceTracker<Broker, Broker> tracker;
+ private Broker broker;
- Broker broker;
- ServiceReference<Broker> brokerRef;
@Override
public final void start(BundleContext context) throws Exception {
+ this.context = context;
this.startImpl(context);
- brokerRef = context.getServiceReference(Broker.class);
- broker = context.getService(brokerRef);
- broker.registerConsumer(this,context);
+ tracker = new ServiceTracker<>(context, Broker.class, this);
+ tracker.open();
}
public final void stop(BundleContext context) throws Exception {
stopImpl(context);
broker = null;
- if(brokerRef != null) {
- context.ungetService(brokerRef);
- }
+ tracker.close();
}
protected void startImpl(BundleContext context) {
return Collections.emptySet();
}
+
+ @Override
+ public Broker addingService(ServiceReference<Broker> reference) {
+ if(broker == null) {
+ broker = context.getService(reference);
+ broker.registerConsumer(this, context);
+ return broker;
+ }
+
+ return null;
+ }
+
+ @Override
+ public void modifiedService(ServiceReference<Broker> reference, Broker service) {
+ // NOOP
+ }
+
+ @Override
+ public void removedService(ServiceReference<Broker> reference, Broker service) {
+ stopImpl(context);
+ }
}
import java.util.Collection;
import java.util.Collections;
+import javax.naming.Context;
+
import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceReference;
+import org.osgi.util.tracker.ServiceTracker;
+import org.osgi.util.tracker.ServiceTrackerCustomizer;
-public abstract class AbstractProvider implements BundleActivator, Provider {
+public abstract class AbstractProvider implements BundleActivator, Provider,ServiceTrackerCustomizer<Broker, Broker> {
- private ServiceReference<Broker> brokerRef;
private Broker broker;
-
+ private BundleContext context;
+ private ServiceTracker<Broker, Broker> tracker;
@Override
public Collection<ProviderFunctionality> getProviderFunctionality() {
return Collections.emptySet();
@Override
public final void start(BundleContext context) throws Exception {
- brokerRef = context.getServiceReference(Broker.class);
- broker = context.getService(brokerRef);
-
+ this.context = context;
this.startImpl(context);
-
- broker.registerProvider(this,context);
+ tracker = new ServiceTracker<>(context, Broker.class, this);
+ tracker.open();
}
protected void startImpl(BundleContext context) {
@Override
public final void stop(BundleContext context) throws Exception {
+ broker = null;
+ tracker.close();
+ tracker = null;
stopImpl(context);
}
+ @Override
+ public Broker addingService(ServiceReference<Broker> reference) {
+ if(broker == null) {
+ broker = context.getService(reference);
+ broker.registerProvider(this, context);
+ return broker;
+ }
+
+ return null;
+ }
+
+ @Override
+ public void modifiedService(ServiceReference<Broker> reference, Broker service) {
+ // NOOP
+ }
+
+ @Override
+ public void removedService(ServiceReference<Broker> reference, Broker service) {
+ stopImpl(context);
+ }
+
}
import org.opendaylight.controller.sal.core.api.data.DataStore
import org.opendaylight.controller.sal.dom.broker.impl.SchemaAwareDataStoreAdapter
import org.opendaylight.controller.sal.core.api.model.SchemaServiceListener
+import org.opendaylight.controller.sal.dom.broker.impl.RpcRouterImpl
class BrokerConfigActivator implements AutoCloseable {
val emptyProperties = new Hashtable<String, String>();
broker.setBundleContext(context);
-
+ broker.setRouter(new RpcRouterImpl("Rpc router"))
schemaService = new SchemaServiceImpl();
schemaService.setContext(context);
schemaService.setParser(new YangParserImpl());
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-parent</artifactId>
+ <relativePath>../..</relativePath>
+ <version>1.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>sal-remoterpc-connector</artifactId>
+ <packaging>bundle</packaging>
+
+ <properties>
+ <zeromq.version>0.3.1</zeromq.version>
+ <jackson.version>1.9.8</jackson.version>
+ <stax.version>1.0.1</stax.version>
+ </properties>
+
+ <dependencies>
+ <!-- MD Sal interdependencies -->
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>sal-core-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>sal-connector-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>sal-common-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>zeromq-routingtable.implementation</artifactId>
+ <!-- TODO: fix the version. Why is it not MD Sal project version?-->
+ <version>0.4.1-SNAPSHOT</version>
+ </dependency>
+
+ <!-- AD Sal -->
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal</artifactId>
+ </dependency>
+
+ <!-- Yang tools -->
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-data-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-data-impl</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-common</artifactId>
+ </dependency>
+
+ <!-- Third Party -->
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.zeromq</groupId>
+ <artifactId>jeromq</artifactId>
+ <version>${zeromq.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId> org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-asl</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>stax</groupId>
+ <artifactId>stax-api</artifactId>
+ <version>${stax.version}</version>
+ </dependency>
+
+ <!-- Tests -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-core</artifactId>
+ </dependency>
+
+ </dependencies>
+
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <version>${bundle.plugin.version}</version>
+ <extensions>true</extensions>
+ <configuration>
+ <instructions>
+ <Import-Package>
+ *,
+ !org.codehaus.enunciate.jaxrs
+ </Import-Package>
+ <Export-Package>
+ org.opendaylight.controller.config.yang.md.sal.remote.rpc,
+ org.opendaylight.controller.sal.connector.remoterpc,
+ org.opendaylight.controller.sal.connector.remoterpc.*
+ </Export-Package>
+ <Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>
+ </instructions>
+ </configuration>
+ </plugin>
+
+
+ <plugin>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-maven-plugin</artifactId>
+ <version>0.5.9-SNAPSHOT</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>generate-sources</goal>
+ </goals>
+ <configuration>
+ <codeGenerators>
+ <generator>
+ <codeGeneratorClass>
+ org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator
+ </codeGeneratorClass>
+ <outputBaseDir>${project.build.directory}/generated-sources/config</outputBaseDir>
+ <additionalConfiguration>
+ <namespaceToPackage1>
+ urn:opendaylight:params:xml:ns:yang:controller==org.opendaylight.controller.config.yang
+ </namespaceToPackage1>
+ </additionalConfiguration>
+ </generator>
+ <generator>
+ <codeGeneratorClass>org.opendaylight.yangtools.yang.unified.doc.generator.maven.DocumentationGeneratorImpl</codeGeneratorClass>
+ <outputBaseDir>target/site/models</outputBaseDir>
+ </generator>
+ </codeGenerators>
+ <inspectDependencies>true</inspectDependencies>
+ </configuration>
+ </execution>
+ </executions>
+ <dependencies>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>yang-jmx-generator-plugin</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>maven-sal-api-gen-plugin</artifactId>
+ <version>0.6.0-SNAPSHOT</version>
+ <type>jar</type>
+ </dependency>
+ </dependencies>
+ </plugin>
+ </plugins>
+ </build>
+</project>
--- /dev/null
+/**
+* Generated file
+
+* Generated from: yang module name: odl-sal-dom-rpc-remote-cfg yang module local name: remote-zeromq-rpc-server
+* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator
+* Generated at: Thu Dec 05 14:25:21 CET 2013
+*
+* Do not modify this file unless it is present under src/main directory
+*/
+package org.opendaylight.controller.config.yang.md.sal.remote.rpc;
+
+import org.opendaylight.controller.sal.connector.remoterpc.Client;
+import org.opendaylight.controller.sal.connector.remoterpc.RemoteRpcProvider;
+import org.opendaylight.controller.sal.connector.remoterpc.RoutingTableProvider;
+import org.opendaylight.controller.sal.connector.remoterpc.ServerImpl;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.osgi.framework.BundleContext;
+
+/**
+*
+*/
+public final class ZeroMQServerModule extends org.opendaylight.controller.config.yang.md.sal.remote.rpc.AbstractZeroMQServerModule
+ {
+
+ private static final Integer ZEROMQ_ROUTER_PORT = 5554;
+ private BundleContext bundleContext;
+
+ public ZeroMQServerModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
+ super(identifier, dependencyResolver);
+ }
+
+ public ZeroMQServerModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver,
+ ZeroMQServerModule oldModule, java.lang.AutoCloseable oldInstance) {
+
+ super(identifier, dependencyResolver, oldModule, oldInstance);
+ }
+
+ @Override
+ protected void customValidation(){
+ // Add custom validation for module attributes here.
+ }
+
+ @Override
+ public java.lang.AutoCloseable createInstance() {
+
+ Broker broker = getDomBrokerDependency();
+ RoutingTableProvider provider = new RoutingTableProvider(bundleContext);
+
+
+ final int port = getPort() != null ? getPort() : ZEROMQ_ROUTER_PORT;
+
+ ServerImpl serverImpl = new ServerImpl(port);
+
+ Client clientImpl = new Client();
+ RemoteRpcProvider facade = new RemoteRpcProvider(serverImpl, clientImpl);
+
+ facade.setRoutingTableProvider(provider );
+
+ broker.registerProvider(facade, bundleContext);
+ return facade;
+ }
+
+ public void setBundleContext(BundleContext bundleContext) {
+ this.bundleContext = bundleContext;
+ }
+}
--- /dev/null
+/**
+* Generated file
+
+* Generated from: yang module name: odl-sal-dom-rpc-remote-cfg yang module local name: remote-zeromq-rpc-server
+* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator
+* Generated at: Thu Dec 05 14:25:21 CET 2013
+*
+* Do not modify this file unless it is present under src/main directory
+*/
+package org.opendaylight.controller.config.yang.md.sal.remote.rpc;
+
+import org.opendaylight.controller.config.api.DependencyResolver;
+import org.opendaylight.controller.config.api.DynamicMBeanWithInstance;
+import org.opendaylight.controller.config.spi.Module;
+import org.osgi.framework.BundleContext;
+
+/**
+*
+*/
+public class ZeroMQServerModuleFactory extends org.opendaylight.controller.config.yang.md.sal.remote.rpc.AbstractZeroMQServerModuleFactory
+{
+
+ @Override
+ public Module createModule(String instanceName, DependencyResolver dependencyResolver, BundleContext bundleContext) {
+ ZeroMQServerModule module = (ZeroMQServerModule) super.createModule(instanceName, dependencyResolver, bundleContext);
+ module.setBundleContext(bundleContext);
+ return module;
+ }
+
+ @Override
+ public Module createModule(String instanceName, DependencyResolver dependencyResolver,
+ DynamicMBeanWithInstance old, BundleContext bundleContext) throws Exception {
+ ZeroMQServerModule module = (ZeroMQServerModule) super.createModule(instanceName, dependencyResolver, old,bundleContext);
+ module.setBundleContext(bundleContext);
+ return module;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import com.google.common.base.Optional;
+
+import org.opendaylight.controller.sal.common.util.RpcErrors;
+import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
+import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.ZMQ;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.*;
+
+import static com.google.common.base.Preconditions.*;
+
+/**
+ * An implementation of {@link RpcImplementation} that makes remote RPC calls
+ */
+public class Client implements RemoteRpcClient {
+
+ private final Logger _logger = LoggerFactory.getLogger(Client.class);
+
+ private final LinkedBlockingQueue<MessageWrapper> requestQueue = new LinkedBlockingQueue<MessageWrapper>(100);
+
+ private final ExecutorService pool = Executors.newSingleThreadExecutor();
+ private final long TIMEOUT = 5000; // in ms
+
+ private RoutingTableProvider routingTableProvider;
+
+ public RoutingTableProvider getRoutingTableProvider() {
+ return routingTableProvider;
+ }
+
+ public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
+ this.routingTableProvider = routingTableProvider;
+ }
+
+ public LinkedBlockingQueue<MessageWrapper> getRequestQueue() {
+ return requestQueue;
+ }
+
+ @Override
+ public Set<QName> getSupportedRpcs() {
+ // TODO: Find the entries from routing table
+ return Collections.emptySet();
+ }
+
+ public void start() {
+ pool.execute(new Sender(this));
+
+ }
+
+ public void stop() {
+
+ _logger.debug("Client stopping...");
+ Context.getInstance().getZmqContext().term();
+ _logger.debug("ZMQ context terminated");
+
+ pool.shutdown(); // intiate shutdown
+ try {
+ if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+ pool.shutdownNow();
+ if (!pool.awaitTermination(10, TimeUnit.SECONDS))
+ _logger.error("Client thread pool did not shut down");
+ }
+ } catch (InterruptedException e) {
+ // (Re-)Cancel if current thread also interrupted
+ pool.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ _logger.debug("Client stopped");
+ }
+
+ @Override
+ public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl();
+ routeId.setType(rpc);
+
+ String address = lookupRemoteAddress(routeId);
+
+ Message request = new Message.MessageBuilder().type(Message.MessageType.REQUEST)
+ .sender(Context.getInstance().getLocalUri()).recipient(address).route(routeId)
+ .payload(XmlUtils.compositeNodeToXml(input)).build();
+
+ List<RpcError> errors = new ArrayList<RpcError>();
+
+ try (SocketPair pair = new SocketPair()) {
+
+ MessageWrapper messageWrapper = new MessageWrapper(request, pair.getSender());
+ process(messageWrapper);
+ Message response = parseMessage(pair.getReceiver());
+
+ CompositeNode payload = XmlUtils.xmlToCompositeNode((String) response.getPayload());
+
+ return Rpcs.getRpcResult(true, payload, errors);
+
+ } catch (Exception e) {
+ collectErrors(e, errors);
+ return Rpcs.getRpcResult(false, null, errors);
+ }
+
+ }
+
+ public void process(MessageWrapper msg) throws TimeoutException, InterruptedException {
+ _logger.debug("Processing message [{}]", msg);
+
+ boolean success = requestQueue.offer(msg, TIMEOUT, TimeUnit.MILLISECONDS);
+ if (!success)
+ throw new TimeoutException("Queue is full");
+ }
+
+ /**
+ * Block on socket for reply
+ *
+ * @param receiver
+ * @return
+ */
+ private Message parseMessage(ZMQ.Socket receiver) throws IOException, ClassNotFoundException {
+ return (Message) Message.deserialize(receiver.recv());
+ }
+
+ /**
+ * Find address for the given route identifier in routing table
+ *
+ * @param routeId
+ * route identifier
+ * @return remote network address
+ */
+ private String lookupRemoteAddress(RpcRouter.RouteIdentifier routeId) {
+ checkNotNull(routeId, "route must not be null");
+
+ Optional<RoutingTable<String, String>> routingTable = routingTableProvider.getRoutingTable();
+ checkNotNull(routingTable.isPresent(), "Routing table is null");
+
+ Set<String> addresses = routingTable.get().getRoutes(routeId.toString());
+ checkNotNull(addresses, "Address not found for route [%s]", routeId);
+ checkState(addresses.size() == 1, "Multiple remote addresses found for route [%s], \nonly 1 expected", routeId); // its
+ // a
+ // global
+ // service.
+
+ String address = addresses.iterator().next();
+ checkNotNull(address, "Address not found for route [%s]", routeId);
+
+ return address;
+ }
+
+ private void collectErrors(Exception e, List<RpcError> errors) {
+ if (e == null)
+ return;
+ if (errors == null)
+ errors = new ArrayList<RpcError>();
+
+ errors.add(RpcErrors.getRpcError(null, null, null, null, e.getMessage(), null, e.getCause()));
+ for (Throwable t : e.getSuppressed()) {
+ errors.add(RpcErrors.getRpcError(null, null, null, null, t.getMessage(), null, t));
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ stop();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import org.zeromq.ZMQ;
+
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.Enumeration;
+
+/**
+ * Provides a ZeroMQ Context object
+ */
+public class Context {
+ private ZMQ.Context zmqContext = ZMQ.context(1);
+ private String uri;
+
+ private static Context _instance = new Context();
+
+ private Context() {}
+
+ public static Context getInstance(){
+ return _instance;
+ }
+
+ public ZMQ.Context getZmqContext(){
+ return this.zmqContext;
+ }
+
+ public String getLocalUri(){
+ uri = (uri != null) ? uri
+ : new StringBuilder("tcp://").append(getIpAddress()).append(":")
+ .append(getRpcPort()).toString();
+
+ return uri;
+ }
+
+ public String getRpcPort(){
+ String rpcPort = (System.getProperty("rpc.port") != null)
+ ? System.getProperty("rpc.port")
+ : "5554";
+
+ return rpcPort;
+ }
+
+ private String getIpAddress(){
+ String ipAddress = (System.getProperty("local.ip") != null)
+ ? System.getProperty("local.ip")
+ : findIpAddress();
+
+ return ipAddress;
+ }
+
+ /**
+ * Finds IPv4 address of the local VM
+ * TODO: This method is non-deterministic. There may be more than one IPv4 address. Cant say which
+ * address will be returned. Read IP from a property file or enhance the code to make it deterministic.
+ * Should we use IP or hostname?
+ *
+ * @return
+ */
+ private String findIpAddress() {
+ String hostAddress = null;
+ Enumeration e = null;
+ try {
+ e = NetworkInterface.getNetworkInterfaces();
+ } catch (SocketException e1) {
+ e1.printStackTrace();
+ }
+ while (e.hasMoreElements()) {
+
+ NetworkInterface n = (NetworkInterface) e.nextElement();
+
+ Enumeration ee = n.getInetAddresses();
+ while (ee.hasMoreElements()) {
+ InetAddress i = (InetAddress) ee.nextElement();
+ if ((i instanceof Inet4Address) && (i.isSiteLocalAddress()))
+ hostAddress = i.getHostAddress();
+ }
+ }
+ return hostAddress;
+
+ }
+}
--- /dev/null
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+
+public interface RemoteRpcClient extends RpcImplementation,AutoCloseable{
+
+
+ void setRoutingTableProvider(RoutingTableProvider provider);
+
+ void stop();
+
+ void start();
+}
--- /dev/null
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import org.opendaylight.controller.sal.connector.remoterpc.Client;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
+import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.controller.sal.core.api.Provider;
+import org.opendaylight.controller.sal.core.api.Provider.ProviderFunctionality;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+
+public class RemoteRpcProvider implements
+ RemoteRpcServer,
+ RemoteRpcClient,
+ Provider {
+
+ private final ServerImpl server;
+ private final Client client;
+ private RoutingTableProvider provider;
+
+ @Override
+ public void setRoutingTableProvider(RoutingTableProvider provider) {
+ this.provider = provider;
+ server.setRoutingTableProvider(provider);
+ client.setRoutingTableProvider(provider);
+ }
+
+ @Override
+ public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+ return client.invokeRpc(rpc, input);
+ }
+
+ @Override
+ public Set<QName> getSupportedRpcs() {
+ return client.getSupportedRpcs();
+ }
+
+
+ public RemoteRpcProvider(ServerImpl server, Client client) {
+ this.server = server;
+ this.client = client;
+ }
+
+ public void setBrokerSession(ProviderSession session) {
+ server.setBrokerSession(session);
+ }
+ public void setServerPool(ExecutorService serverPool) {
+ server.setServerPool(serverPool);
+ }
+ public void start() {
+ client.setRoutingTableProvider(provider);
+ server.setRoutingTableProvider(provider);
+ server.start();
+ client.start();
+ }
+ public void onRouteUpdated(String key, Set values) {
+ server.onRouteUpdated(key, values);
+ }
+ public void onRouteDeleted(String key) {
+ server.onRouteDeleted(key);
+ }
+
+
+ @Override
+ public Collection<ProviderFunctionality> getProviderFunctionality() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+
+ @Override
+ public void onSessionInitiated(ProviderSession session) {
+ server.setBrokerSession(session);
+ start();
+ }
+
+
+ public void close() throws Exception {
+ server.close();
+ client.close();
+ }
+
+
+
+
+ @Override
+ public void stop() {
+ server.stop();
+ client.stop();
+ }
+}
--- /dev/null
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+
+public interface RemoteRpcServer extends AutoCloseable {
+
+}
--- /dev/null
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
+import org.osgi.framework.BundleContext;
+import org.osgi.util.tracker.ServiceTracker;
+
+import com.google.common.base.Optional;
+
+public class RoutingTableProvider implements AutoCloseable {
+
+ @SuppressWarnings("rawtypes")
+ final ServiceTracker<RoutingTable,RoutingTable> tracker;
+
+
+ public RoutingTableProvider(BundleContext ctx) {
+ @SuppressWarnings("rawtypes")
+ ServiceTracker<RoutingTable, RoutingTable> rawTracker = new ServiceTracker<>(ctx, RoutingTable.class, null);
+ tracker = rawTracker;
+ tracker.open();
+ }
+
+ public Optional<RoutingTable<String, String>> getRoutingTable() {
+ @SuppressWarnings("unchecked")
+ RoutingTable<String,String> tracked = tracker.getService();
+ return Optional.fromNullable(tracked);
+ }
+
+ @Override
+ public void close() throws Exception {
+ tracker.close();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.ZMQ;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A class encapsulating {@link ZMQ.Socket} of type {@link ZMQ.REQ}.
+ * It adds following capabilities:
+ * <li> Retry logic - Tries 3 times before giving up
+ * <li> Request times out after {@link TIMEOUT} property
+ * <li> The limitation of {@link ZMQ.REQ}/{@link ZMQ.REP} pair is that no 2 requests can be sent before
+ * the response for the 1st request is received. To overcome that, this socket queues all messages until
+ * the previous request has been responded.
+ */
+public class RpcSocket {
+
+ // Constants
+ public static final int TIMEOUT = 2000;
+ public static final int QUEUE_SIZE = 10;
+ public static final int NUM_RETRIES = 3;
+ private static final Logger log = LoggerFactory.getLogger(RpcSocket.class);
+
+ private ZMQ.Socket socket;
+ private ZMQ.Poller poller;
+ private String address;
+ private SocketState state;
+ private long sendTime;
+ private int retriesLeft;
+ private LinkedBlockingQueue<MessageWrapper> inQueue;
+
+
+ public RpcSocket(String address, ZMQ.Poller poller) {
+ this.socket = null;
+ this.state = new IdleSocketState();
+ this.sendTime = -1;
+ this.retriesLeft = NUM_RETRIES;
+ this.inQueue = new LinkedBlockingQueue<MessageWrapper>(QUEUE_SIZE);
+ this.address = address;
+ this.poller = poller;
+ createSocket();
+ }
+
+ public ZMQ.Socket getSocket() {
+ return socket;
+ }
+
+ public String getAddress() {
+ return address;
+ }
+
+ public int getRetriesLeft() {
+ return retriesLeft;
+ }
+
+ public void setRetriesLeft(int retriesLeft) {
+ this.retriesLeft = retriesLeft;
+ }
+
+ public SocketState getState() {
+ return state;
+ }
+
+ public void setState(SocketState state) {
+ this.state = state;
+ }
+
+ public int getQueueSize() {
+ return inQueue.size();
+ }
+
+ public MessageWrapper removeCurrentRequest() {
+ return inQueue.poll();
+ }
+
+ public boolean hasTimedOut() {
+ return (System.currentTimeMillis() - sendTime > RpcSocket.TIMEOUT);
+ }
+
+ public void send(MessageWrapper request) throws TimeoutException {
+ try {
+ boolean success = inQueue.offer(request, TIMEOUT, TimeUnit.MILLISECONDS);
+ if (!success) {
+ throw new TimeoutException("send :: Queue is full");
+ }
+ process();
+ }
+ catch (InterruptedException e) {
+ log.error("send : Thread interrupted while attempting to add request to inQueue", e);
+ }
+ }
+
+ public MessageWrapper receive() {
+ Message response = parseMessage();
+ MessageWrapper messageWrapper = inQueue.poll(); //remove the message from queue
+ MessageWrapper responseMessageWrapper = new MessageWrapper(response, messageWrapper.getReceiveSocket());
+
+ state = new IdleSocketState();
+ retriesLeft = NUM_RETRIES;
+ return responseMessageWrapper;
+ }
+
+ public void process() {
+ if (getQueueSize() > 0) //process if there's message in the queue
+ state.process(this);
+ }
+
+ // Called by IdleSocketState & BusySocketState
+ public void sendMessage() {
+ //Get the message from queue without removing it. For retries
+ MessageWrapper messageWrapper = inQueue.peek();
+ if (messageWrapper != null) {
+ Message message = messageWrapper.getMessage();
+ try {
+ socket.send(Message.serialize(message));
+ }
+ catch (IOException e) {
+ log.debug("Message send failed [{}]", message);
+ log.debug("Exception [{}]", e);
+ }
+ sendTime = System.currentTimeMillis();
+ }
+ }
+
+ public Message parseMessage() {
+ Message parsedMessage = null;
+ byte[] bytes = socket.recv();
+ log.debug("Received bytes:[{}]", bytes.length);
+ try {
+ parsedMessage = (Message)Message.deserialize(bytes);
+ }
+ catch (IOException|ClassNotFoundException e) {
+ log.debug("parseMessage : Deserializing received bytes failed", e);
+ }
+
+ return parsedMessage;
+ }
+
+ public void recycleSocket() {
+ close();
+ }
+
+ public void close() {
+ socket.setLinger(10);
+ socket.close();
+ }
+
+ private void createSocket() {
+ socket = Context.getInstance().getZmqContext().socket(ZMQ.REQ);
+ socket.connect(address);
+ poller.register(socket, ZMQ.Poller.POLLIN);
+ state = new IdleSocketState();
+ }
+
+
+ /**
+ * Represents the state of a {@link org.opendaylight.controller.sal.connector.remoterpc.RpcSocket}
+ */
+ public static interface SocketState {
+
+ /* The processing actions to be performed in this state
+ */
+ public void process(RpcSocket socket);
+ }
+
+ /**
+ * Represents the idle state of a {@link org.opendaylight.controller.sal.connector.remoterpc.RpcSocket}
+ */
+ public static class IdleSocketState implements SocketState {
+
+ @Override
+ public void process(RpcSocket socket) {
+ socket.sendMessage();
+ socket.setState(new BusySocketState());
+ socket.setRetriesLeft(socket.getRetriesLeft()-1);
+ }
+ }
+
+ /**
+ * Represents the busy state of a {@link org.opendaylight.controller.sal.connector.remoterpc.RpcSocket}
+ */
+ public static class BusySocketState implements SocketState {
+
+ private static Logger log = LoggerFactory.getLogger(BusySocketState.class);
+
+ @Override
+ public void process(RpcSocket socket) {
+ if (socket.hasTimedOut()) {
+ if (socket.getRetriesLeft() > 0) {
+ log.debug("process : Request timed out, retrying now...");
+ socket.sendMessage();
+ socket.setRetriesLeft(socket.getRetriesLeft() - 1);
+ }
+ else {
+ // No more retries for current request, so stop processing the current request
+ MessageWrapper message = socket.removeCurrentRequest();
+ if (message != null) {
+ log.error("Unable to process rpc request [{}]", message);
+ socket.setState(new IdleSocketState());
+ socket.setRetriesLeft(NUM_RETRIES);
+ }
+ }
+ }
+ // Else no timeout, so allow processing to continue
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.ZMQ;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+import static com.google.common.base.Preconditions.*;
+
+/**
+ * Main server thread for sending requests.
+ */
+public class Sender implements Runnable{
+
+ private final static Logger _logger = LoggerFactory.getLogger(Sender.class);
+ private final Client client;
+
+
+
+
+ public Sender(Client client) {
+ super();
+ this.client = client;
+ }
+
+@Override
+ public void run() {
+ _logger.info("Starting...");
+
+ try (SocketManager socketManager = new SocketManager()){
+ while (!Thread.currentThread().isInterrupted()) {
+
+ //read incoming messages from blocking queue
+ MessageWrapper request = pollForRequest();
+
+ if (request != null) {
+ processRequest(socketManager, request);
+ }
+
+ flushSockets(socketManager);
+ pollForResponse(socketManager);
+ processResponse(socketManager);
+
+ }
+ } catch(Exception t){
+ _logger.error("Exception: [{}]", t);
+ _logger.error("Stopping...");
+ }
+ }
+
+ private void processResponse(SocketManager socketManager) {
+ for (int i = 0; i < socketManager.getPoller().getSize(); i++) {
+ // If any sockets get a response, process it
+ if (socketManager.getPoller().pollin(i)) {
+ Optional<RpcSocket> socket = socketManager.getManagedSocketFor(
+ socketManager.getPoller().getItem(i).getSocket());
+
+ checkState(socket.isPresent(), "Managed socket not found");
+
+ MessageWrapper response = socket.get().receive();
+ _logger.debug("Received rpc response [{}]", response.getMessage());
+
+ //TODO: handle exception and introduce timeout on receiver side
+ try {
+ response.getReceiveSocket().send(Message.serialize(response.getMessage()));
+ } catch (IOException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ }
+ }
+
+ private void processRequest(SocketManager socketManager, MessageWrapper request) throws TimeoutException {
+
+ if ((request.getMessage() == null) ||
+ (request.getMessage().getRecipient() == null)) {
+ //invalid message. log and drop
+ _logger.error("Invalid request [{}]", request);
+ return;
+ }
+
+ RpcSocket socket =
+ socketManager.getManagedSocket(request.getMessage().getRecipient());
+
+ socket.send(request);
+ }
+
+ private void flushSockets(SocketManager socketManager){
+ for (RpcSocket socket : socketManager.getManagedSockets()){
+ socket.process();
+ }
+ }
+
+ private MessageWrapper pollForRequest(){
+ return client.getRequestQueue().poll();
+ }
+
+ private void pollForResponse(SocketManager socketManager){
+ try{
+ socketManager.getPoller().poll(10); //poll every 10ms
+ }catch (Throwable t) { /*Ignore and continue*/ }
+ }
+}
+
+
+/*
+SCALA
+
+package org.opendaylight.controller.sal.connector.remoterpc
+
+ import org.slf4j.{LoggerFactory, Logger}
+ import scala.collection.JavaConverters._
+ import scala.Some
+ import org.opendaylight.controller.sal.connector.remoterpc.dto.{MessageWrapper, Message}
+*/
+/**
+ * Main server thread for sending requests. This does not maintain any state. If the
+ * thread dies, it will be restarted
+ */
+/*class Sender extends Runnable {
+ private val _logger: Logger = LoggerFactory.getLogger(Sender.this.getClass())
+
+ override def run = {
+ _logger.info("Sender starting...")
+ val socketManager = new SocketManager()
+
+ try {
+ while (!Thread.currentThread().isInterrupted) {
+ //read incoming messages from blocking queue
+ val request: MessageWrapper = Client.requestQueue.poll()
+
+ if (request != null) {
+ if ((request.message != null) &&
+ (request.message.getRecipient != null)) {
+
+ val socket = socketManager.getManagedSocket(request.message.getRecipient)
+ socket.send(request)
+ } else {
+ //invalid message. log and drop
+ _logger.error("Invalid request [{}]", request)
+ }
+ }
+
+ socketManager.getManagedSockets().asScala.map(s => s.process)
+
+ // Poll all sockets for responses every 1 sec
+ poll(socketManager)
+
+ // If any sockets get a response, process it
+ for (i <- 0 until socketManager.poller.getSize) {
+ if (socketManager.poller.pollin(i)) {
+ val socket = socketManager.getManagedSocketFor(socketManager.poller.getItem(i).getSocket)
+
+ socket match {
+ case None => //{
+ _logger.error("Could not find a managed socket for zmq socket")
+ throw new IllegalStateException("Could not find a managed socket for zmq socket")
+ //}
+ case Some(s) => {
+ val response = s.receive()
+ _logger.debug("Received rpc response [{}]", response.message)
+ response.receiveSocket.send(Message.serialize(response.message))
+ }
+ }
+ }
+ }
+
+ }
+ } catch{
+ case e:Exception => {
+ _logger.debug("Sender stopping due to exception")
+ e.printStackTrace()
+ }
+ } finally {
+ socketManager.stop
+ }
+ }
+
+ def poll(socketManager:SocketManager) = {
+ try{
+ socketManager.poller.poll(10)
+ }catch{
+ case t:Throwable => //ignore and continue
+ }
+ }
+}
+
+
+// def newThread(r: Runnable): Thread = {
+// val t = new RequestHandler()
+// t.setUncaughtExceptionHandler(new RequestProcessorExceptionHandler)
+// t
+// }
+
+
+
+/**
+ * Restarts the request processing server in the event of unforeseen exceptions
+ */
+//private class RequestProcessorExceptionHandler extends UncaughtExceptionHandler {
+// def uncaughtException(t: Thread, e: Throwable) = {
+// _logger.error("Exception caught during request processing [{}]", e)
+// _logger.info("Restarting request processor server...")
+// RequestProcessor.start()
+// }
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import com.google.common.base.Optional;
+
+import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
+import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message.MessageType;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
+import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
+import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.ZMQ;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ * ZeroMq based implementation of RpcRouter TODO: 1. Make rpc request handling
+ * async and non-blocking. Note zmq socket is not thread safe 2. Read properties
+ * from config file using existing(?) ODL properties framework
+ */
+public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String, Set> {
+
+ private Logger _logger = LoggerFactory.getLogger(ServerImpl.class);
+
+ private ExecutorService serverPool;
+
+ // private RoutingTable<RpcRouter.RouteIdentifier, String> routingTable;
+ private RoutingTableProvider routingTable;
+ private Set<QName> remoteServices;
+ private ProviderSession brokerSession;
+ private ZMQ.Context context;
+ private ZMQ.Socket replySocket;
+
+ private final RpcListener listener = new RpcListener();
+
+ private final String localUri = Context.getInstance().getLocalUri();
+
+ private final int rpcPort;
+
+ private RpcImplementation client;
+
+ public RpcImplementation getClient() {
+ return client;
+ }
+
+ public void setClient(RpcImplementation client) {
+ this.client = client;
+ }
+
+ // Prevent instantiation
+ public ServerImpl(int rpcPort) {
+ this.rpcPort = rpcPort;
+ }
+
+ public void setBrokerSession(ProviderSession session) {
+ this.brokerSession = session;
+ }
+
+ public ExecutorService getServerPool() {
+ return serverPool;
+ }
+
+ public void setServerPool(ExecutorService serverPool) {
+ this.serverPool = serverPool;
+ }
+
+ public void start() {
+ context = ZMQ.context(1);
+ serverPool = Executors.newSingleThreadExecutor();
+ remoteServices = new HashSet<QName>();
+
+ // Start listening rpc requests
+ serverPool.execute(receive());
+
+ brokerSession.addRpcRegistrationListener(listener);
+ // routingTable.registerRouteChangeListener(routeChangeListener);
+
+ Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
+ for (QName rpc : currentlySupported) {
+ listener.onRpcImplementationAdded(rpc);
+ }
+
+ _logger.debug("RPC Server started [{}]", localUri);
+ }
+
+ public void stop() {
+ // TODO: un-subscribe
+
+ // if (context != null)
+ // context.term();
+ //
+ // _logger.debug("ZMQ Context is terminated.");
+
+ if (serverPool != null)
+ serverPool.shutdown();
+
+ _logger.debug("Thread pool is closed.");
+ }
+
+ private Runnable receive() {
+ return new Runnable() {
+ public void run() {
+
+ // Bind to RPC reply socket
+ replySocket = context.socket(ZMQ.REP);
+ replySocket.bind("tcp://*:" + Context.getInstance().getRpcPort());
+
+ // Poller enables listening on multiple sockets using a single
+ // thread
+ ZMQ.Poller poller = new ZMQ.Poller(1);
+ poller.register(replySocket, ZMQ.Poller.POLLIN);
+ try {
+ // TODO: Add code to restart the thread after exception
+ while (!Thread.currentThread().isInterrupted()) {
+
+ poller.poll();
+
+ if (poller.pollin(0)) {
+ handleRpcCall();
+ }
+ }
+ } catch (Exception e) {
+ // log and continue
+ _logger.error("Unhandled exception [{}]", e);
+ } finally {
+ poller.unregister(replySocket);
+ replySocket.close();
+ }
+
+ }
+ };
+ }
+
+ /**
+ * @throws InterruptedException
+ * @throws ExecutionException
+ */
+ private void handleRpcCall() {
+
+ Message request = parseMessage(replySocket);
+
+ _logger.debug("Received rpc request [{}]", request);
+
+ // Call broker to process the message then reply
+ Future<RpcResult<CompositeNode>> rpc = null;
+ RpcResult<CompositeNode> result = null;
+ try {
+ rpc = brokerSession.rpc((QName) request.getRoute().getType(),
+ XmlUtils.xmlToCompositeNode((String) request.getPayload()));
+
+ result = (rpc != null) ? rpc.get() : null;
+
+ } catch (Exception e) {
+ _logger.debug("Broker threw [{}]", e);
+ }
+
+ CompositeNode payload = (result != null) ? result.getResult() : null;
+
+ Message response = new Message.MessageBuilder().type(MessageType.RESPONSE).sender(localUri)
+ .route(request.getRoute()).payload(XmlUtils.compositeNodeToXml(payload)).build();
+
+ _logger.debug("Sending rpc response [{}]", response);
+
+ try {
+ replySocket.send(Message.serialize(response));
+ } catch (Exception e) {
+ _logger.debug("rpc response send failed for message [{}]", response);
+ _logger.debug("{}", e);
+ }
+
+ }
+
+ /**
+ * @param socket
+ * @return
+ */
+ private Message parseMessage(ZMQ.Socket socket) {
+
+ Message msg = null;
+ try {
+ byte[] bytes = socket.recv();
+ _logger.debug("Received bytes:[{}]", bytes.length);
+ msg = (Message) Message.deserialize(bytes);
+ } catch (Throwable t) {
+ t.printStackTrace();
+ }
+ return msg;
+ }
+
+ @Override
+ public void onRouteUpdated(String key, Set values) {
+ RouteIdentifierImpl rId = new RouteIdentifierImpl();
+ try {
+ _logger.debug("Updating key/value {}-{}", key, values);
+ brokerSession.addRpcImplementation((QName) rId.fromString(key).getType(), client);
+
+ } catch (Exception e) {
+ _logger.info("Route update failed {}", e);
+ }
+ }
+
+ @Override
+ public void onRouteDeleted(String key) {
+ // TODO: Broker session needs to be updated to support this
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Listener for rpc registrations
+ */
+ private class RpcListener implements RpcRegistrationListener {
+
+
+
+ @Override
+ public void onRpcImplementationAdded(QName name) {
+
+ // if the service name exists in the set, this notice
+ // has bounced back from the broker. It should be ignored
+ if (remoteServices.contains(name))
+ return;
+
+ _logger.debug("Adding registration for [{}]", name);
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl();
+ routeId.setType(name);
+
+ try {
+ routingTable.getRoutingTable().get().addGlobalRoute(routeId.toString(), localUri);
+ _logger.debug("Route added [{}-{}]", name, localUri);
+ } catch (RoutingTableException | SystemException e) {
+ // TODO: This can be thrown when route already exists in the
+ // table. Broker
+ // needs to handle this.
+ _logger.error("Unhandled exception while adding global route to routing table [{}]", e);
+
+ }
+ }
+
+ @Override
+ public void onRpcImplementationRemoved(QName name) {
+
+ _logger.debug("Removing registration for [{}]", name);
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl();
+ routeId.setType(name);
+
+ try {
+ routingTable.getRoutingTable().get().removeGlobalRoute(routeId.toString());
+ } catch (RoutingTableException | SystemException e) {
+ _logger.error("Route delete failed {}", e);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ stop();
+ }
+
+ public void setRoutingTableProvider(RoutingTableProvider provider) {
+ this.routingTable = provider;
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import com.google.common.base.Optional;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.ZMQ;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * Manages creation of {@link RpcSocket} and their registration with {@link ZMQ.Poller}
+ */
+public class SocketManager implements AutoCloseable{
+ private static final Logger log = LoggerFactory.getLogger(SocketManager.class);
+
+ /*
+ * RpcSockets mapped by network address its connected to
+ */
+ private ConcurrentHashMap<String, RpcSocket> managedSockets = new ConcurrentHashMap<String, RpcSocket>();
+
+ private ZMQ.Poller _poller = new ZMQ.Poller(2); //randomly selected size. Poller grows automatically
+
+ /**
+ * Returns a {@link RpcSocket} for the given address
+ * @param address network address with port eg: 10.199.199.20:5554
+ * @return
+ */
+ public RpcSocket getManagedSocket(String address) throws IllegalArgumentException {
+ //Precondition
+ if (!address.matches("(tcp://)(.*)(:)(\\d*)")) {
+ throw new IllegalArgumentException("Address must of format 'tcp://<ip address>:<port>' but is " + address);
+ }
+
+ if (!managedSockets.containsKey(address)) {
+ log.debug("{} Creating new socket for {}", Thread.currentThread().getName());
+ RpcSocket socket = new RpcSocket(address, _poller);
+ managedSockets.put(address, socket);
+ }
+
+ return managedSockets.get(address);
+ }
+
+ /**
+ * Returns a {@link RpcSocket} for the given {@link ZMQ.Socket}
+ * @param socket
+ * @return
+ */
+ public Optional<RpcSocket> getManagedSocketFor(ZMQ.Socket socket) {
+ for (RpcSocket rpcSocket : managedSockets.values()) {
+ if (rpcSocket.getSocket().equals(socket)) {
+ return Optional.of(rpcSocket);
+ }
+ }
+ return Optional.absent();
+ }
+
+ /**
+ * Return a collection of all managed sockets
+ * @return
+ */
+ public Collection<RpcSocket> getManagedSockets() {
+ return managedSockets.values();
+ }
+
+ /**
+ * Returns the {@link ZMQ.Poller}
+ * @return
+ */
+ public ZMQ.Poller getPoller() {
+ return _poller;
+ }
+
+ /**
+ * This should be called when stopping the server to close all the sockets
+ * @return
+ */
+ @Override
+ public void close() throws Exception {
+ log.debug("Stopping...");
+ for (RpcSocket socket : managedSockets.values()) {
+ socket.close();
+ }
+ managedSockets.clear();
+ log.debug("Stopped");
+ }
+}
--- /dev/null
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import org.zeromq.ZMQ;
+
+import java.util.UUID;
+
+/**
+ *
+ */
+public class SocketPair implements AutoCloseable{
+ private ZMQ.Socket sender;
+ private ZMQ.Socket receiver;
+
+ private static final String INPROC_PREFIX = "inproc://";
+
+ public SocketPair(){
+ String address = new StringBuilder(INPROC_PREFIX)
+ .append(UUID.randomUUID())
+ .toString();
+
+ receiver = Context.getInstance().getZmqContext().socket(ZMQ.PAIR);
+ receiver.bind(address);
+
+ sender = Context.getInstance().getZmqContext().socket(ZMQ.PAIR);
+ sender.connect(address);
+ }
+
+ public ZMQ.Socket getSender(){
+ return this.sender;
+ }
+
+ public ZMQ.Socket getReceiver(){
+ return this.receiver;
+ }
+
+ @Override
+ public void close() throws Exception {
+ sender.close();
+ receiver.close();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc.dto;
+
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.*;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class CompositeNodeImpl implements CompositeNode, Serializable {
+
+ private QName key;
+ private List<Node<?>> children;
+
+ @Override
+ public List<Node<?>> getChildren() {
+ return children;
+ }
+
+ @Override
+ public List<CompositeNode> getCompositesByName(QName children) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<CompositeNode> getCompositesByName(String children) {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public List<SimpleNode<?>> getSimpleNodesByName(QName children) {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public List<SimpleNode<?>> getSimpleNodesByName(String children) {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public CompositeNode getFirstCompositeByName(QName container) {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public SimpleNode<?> getFirstSimpleByName(QName leaf) {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public MutableCompositeNode asMutable() {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public QName getKey() {
+ return key; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public List<Node<?>> setValue(List<Node<?>> value) {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public int size() {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public boolean containsValue(Object value) {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public List<Node<?>> get(Object key) {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public List<Node<?>> put(QName key, List<Node<?>> value) {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public List<Node<?>> remove(Object key) {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public void putAll(Map<? extends QName, ? extends List<Node<?>>> m) {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public void clear() {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public Set<QName> keySet() {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public Collection<List<Node<?>>> values() {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public Set<Entry<QName, List<Node<?>>>> entrySet() {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public QName getNodeType() {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public CompositeNode getParent() {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public List<Node<?>> getValue() {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public ModifyAction getModificationAction() {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+}
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq;
+package org.opendaylight.controller.sal.connector.remoterpc.dto;
-
-import org.codehaus.jackson.map.ObjectMapper;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import java.io.*;
ANNOUNCE((byte) 0), //TODO: Remove announce, add rpc registration and deregistration
HEARTBEAT((byte) 1),
REQUEST((byte) 2),
- RESPONSE((byte) 3);
+ RESPONSE((byte) 3),
+ ERROR((byte)4);
private final byte type;
private MessageType type;
private String sender;
+ private String recipient;
private RpcRouter.RouteIdentifier route;
private Object payload;
this.payload = payload;
}
+ public String getRecipient() {
+ return recipient;
+ }
+
+ public void setRecipient(String recipient) {
+ this.recipient = recipient;
+ }
@Override
public String toString() {
return "Message{" +
"type=" + type +
", sender='" + sender + '\'' +
+ ", recipient='" + recipient + '\'' +
", route=" + route +
", payload=" + payload +
'}';
return o.readObject();
}
- public static byte[] toJsonBytes(Message m) throws IOException {
- ObjectMapper o = new ObjectMapper();
- return o.writeValueAsBytes(m);
- }
-
- public static Message fromJsonBytes(byte [] bytes) throws IOException {
-
- ObjectMapper o = new ObjectMapper();
- return o.readValue(bytes, Message.class);
- }
-
public static class Response extends Message implements RpcRouter.RpcReply {
private ResponseCode code; // response code
return this;
}
+ public MessageBuilder recipient(String recipient){
+ message.setRecipient(recipient);
+ return this;
+ }
+
public MessageBuilder route(RpcRouter.RouteIdentifier route){
message.setRoute(route);
return this;
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc.dto;
+
+import org.zeromq.ZMQ;
+
+/**
+ * A class encapsulating {@link Message} and the {@link ZMQ.Socket} over which it is transmitted
+ */
+public class MessageWrapper {
+
+ private Message _message;
+ private ZMQ.Socket _receiveSocket;
+
+ public MessageWrapper(Message message, ZMQ.Socket receiveSocket) {
+ this._message = message;
+ this._receiveSocket = receiveSocket;
+ }
+
+ public Message getMessage() {
+ return _message;
+ }
+
+ public ZMQ.Socket getReceiveSocket() {
+ return _receiveSocket;
+ }
+}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq;
+package org.opendaylight.controller.sal.connector.remoterpc.dto;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import java.io.Serializable;
+import java.net.URI;
-/**
- * User: abhishk2
- */
public class RouteIdentifierImpl implements RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>,Serializable {
+ transient ObjectMapper mapper = new ObjectMapper();
+
private QName context;
private QName type;
private InstanceIdentifier route;
@Override
public String toString() {
- return "RouteIdentifierImpl{" +
- "context=" + context +
- ", type=" + type +
- ", route=" + route +
- '}';
+ try {
+ return mapper.writeValueAsString(this);
+ } catch (Throwable e) {
+ //do nothing
+ }
+
+ return super.toString();
+ }
+
+ public RpcRouter.RouteIdentifier fromString(String input)
+ throws Exception {
+
+ JsonNode root = mapper.readTree(input);
+ this.context = parseQName(root.get("context"));
+ this.type = parseQName(root.get("type"));
+
+ return this;
+ }
+
+ private QName parseQName(JsonNode node){
+ if (node == null) return null;
+
+ String namespace = (node.get("namespace") != null) ?
+ node.get("namespace").asText() : "";
+
+ String localName = (node.get("localName") != null) ?
+ node.get("localName").asText() : "";
+
+ URI uri = URI.create(namespace);
+ return new QName(uri, localName);
}
}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq;
+package org.opendaylight.controller.sal.connector.remoterpc.dto;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.yangtools.yang.common.QName;
--- /dev/null
+package org.opendaylight.controller.sal.connector.remoterpc.util;
+
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.Node;
+import org.opendaylight.yangtools.yang.data.api.SimpleNode;
+import org.opendaylight.yangtools.yang.data.impl.NodeUtils;
+import org.opendaylight.yangtools.yang.data.impl.XmlTreeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+
+import javax.xml.stream.XMLStreamException;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+import java.io.ByteArrayInputStream;
+import java.io.StringWriter;
+
+public class XmlUtils {
+
+ private static final Logger _logger = LoggerFactory.getLogger(XmlUtils.class);
+
+ public static String compositeNodeToXml(CompositeNode cNode){
+ if (cNode == null) return new String();
+
+ Document domTree = NodeUtils.buildShadowDomTree(cNode);
+ StringWriter writer = new StringWriter();
+ try {
+ TransformerFactory tf = TransformerFactory.newInstance();
+ Transformer transformer = tf.newTransformer();
+ transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
+ transformer.transform(new DOMSource(domTree), new StreamResult(writer));
+ } catch (TransformerException e) {
+ _logger.error("Error during translation of Document to OutputStream", e);
+ }
+
+ return writer.toString();
+ }
+
+ public static CompositeNode xmlToCompositeNode(String xml){
+ if (xml==null || xml.length()==0) return null;
+
+ Node<?> dataTree;
+ try {
+ dataTree = XmlTreeBuilder.buildDataTree(new ByteArrayInputStream(xml.getBytes()));
+ } catch (XMLStreamException e) {
+ _logger.error("Error during building data tree from XML", e);
+ return null;
+ }
+ if (dataTree == null) {
+ _logger.error("data tree is null");
+ return null;
+ }
+ if (dataTree instanceof SimpleNode) {
+ _logger.error("RPC XML was resolved as SimpleNode");
+ return null;
+ }
+ return (CompositeNode) dataTree;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc
+
+import org.opendaylight.yangtools.yang.data.api.CompositeNode
+import org.opendaylight.yangtools.yang.common.{RpcError, RpcResult, QName}
+import org.opendaylight.controller.sal.core.api.RpcImplementation
+import java.util
+import java.util.{UUID, Collections}
+import org.zeromq.ZMQ
+import org.opendaylight.controller.sal.common.util.{RpcErrors, Rpcs}
+import org.slf4j.LoggerFactory
+import org.opendaylight.controller.sal.connector.remoterpc.dto.{MessageWrapper, RouteIdentifierImpl, Message}
+import Message.MessageType
+import java.util.concurrent._
+import java.lang.InterruptedException
+
+
+/**
+ * An implementation of {@link RpcImplementation} that makes
+ * remote RPC calls
+ */
+class Client extends RemoteRpcClient {
+
+ private val _logger = LoggerFactory.getLogger(this.getClass);
+
+ val requestQueue = new LinkedBlockingQueue[MessageWrapper](100)
+ val pool: ExecutorService = Executors.newSingleThreadExecutor()
+ private val TIMEOUT = 5000 //in ms
+ var routingTableProvider: RoutingTableProvider = null
+
+
+ def getInstance = this
+
+
+ def setRoutingTableProvider(provider : RoutingTableProvider) = {
+ routingTableProvider = provider;
+ }
+
+ def getSupportedRpcs: util.Set[QName] = {
+ Collections.emptySet()
+ }
+
+ def invokeRpc(rpc: QName, input: CompositeNode): RpcResult[CompositeNode] = {
+
+ val routeId = new RouteIdentifierImpl()
+ routeId.setType(rpc)
+
+ //lookup address for the rpc request
+ val routingTable = routingTableProvider.getRoutingTable()
+ require( routingTable != null, "Routing table not found. Exiting" )
+
+ val addresses:util.Set[String] = routingTable.getRoutes(routeId.toString)
+ require(addresses != null, "Address not found for rpc " + rpc);
+ require(addresses.size() == 1) //its a global service.
+
+ val address = addresses.iterator().next()
+ require(address != null, "Address is null")
+
+ //create in-process "pair" socket and pass it to sender thread
+ //Sender replies on this when result is available
+ val inProcAddress = "inproc://" + UUID.randomUUID()
+ val receiver = Context.zmqContext.socket(ZMQ.PAIR)
+ receiver.bind(inProcAddress);
+
+ val sender = Context.zmqContext.socket(ZMQ.PAIR)
+ sender.connect(inProcAddress)
+
+ val requestMessage = new Message.MessageBuilder()
+ .`type`(MessageType.REQUEST)
+ //.sender("tcp://localhost:8081")
+ .recipient(address)
+ .route(routeId)
+ .payload(input)
+ .build()
+
+ _logger.debug("Queuing up request and expecting response on [{}]", inProcAddress)
+
+ val messageWrapper = new MessageWrapper(requestMessage, sender)
+ val errors = new util.ArrayList[RpcError]
+
+ try {
+ process(messageWrapper)
+ val response = parseMessage(receiver)
+
+ return Rpcs.getRpcResult(
+ true, response.getPayload.asInstanceOf[CompositeNode], Collections.emptySet())
+
+ } catch {
+ case e: Exception => {
+ errors.add(RpcErrors.getRpcError(null,null,null,null,e.getMessage,null,e.getCause))
+ return Rpcs.getRpcResult(false, null, errors)
+ }
+ } finally {
+ receiver.close();
+ sender.close();
+ }
+
+ }
+
+ /**
+ * Block on socket for reply
+ * @param receiver
+ * @return
+ */
+ private def parseMessage(receiver:ZMQ.Socket): Message = {
+ val bytes = receiver.recv()
+ return Message.deserialize(bytes).asInstanceOf[Message]
+ }
+
+ def start() = {
+ pool.execute(new Sender)
+ }
+
+ def process(msg: MessageWrapper) = {
+ _logger.debug("Processing message [{}]", msg)
+ val success = requestQueue.offer(msg, TIMEOUT, TimeUnit.MILLISECONDS)
+
+ if (!success) throw new TimeoutException("Queue is full");
+
+ }
+
+ def stop() = {
+ pool.shutdown() //intiate shutdown
+ _logger.debug("Client stopping...")
+ // Context.zmqContext.term();
+ // _logger.debug("ZMQ context terminated")
+
+ try {
+
+ if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+ pool.shutdownNow();
+ if (!pool.awaitTermination(10, TimeUnit.SECONDS))
+ _logger.error("Client thread pool did not shut down");
+ }
+ } catch {
+ case ie:InterruptedException =>
+ // (Re-)Cancel if current thread also interrupted
+ pool.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ _logger.debug("Client stopped")
+ }
+
+ def close() = {
+ stop();
+ }
+}
--- /dev/null
+module odl-sal-dom-rpc-remote-cfg {
+ yang-version 1;
+ namespace "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote:rpc";
+ prefix "rpc-cluster";
+
+ import config { prefix config; revision-date 2013-04-05; }
+ import opendaylight-md-sal-dom {prefix dom;}
+
+ description
+ "Service definition for Binding Aware MD-SAL.";
+
+ revision "2013-10-28" {
+ description
+ "Initial revision";
+ }
+
+ identity remote-rpc-server {
+ base config:service-type;
+ config:java-class "org.opendaylight.controller.sal.connector.remoterpc.RemoteRpcServer";
+ }
+
+ identity remote-rpc-client {
+ base config:service-type;
+ config:java-class "org.opendaylight.controller.sal.connector.remoterpc.RemoteRpcClient";
+ }
+
+ identity remote-zeromq-rpc-server {
+ base config:module-type;
+ config:provided-service remote-rpc-server;
+ config:provided-service remote-rpc-client;
+ config:java-name-prefix ZeroMQServer;
+ }
+
+ augment "/config:modules/config:module/config:configuration" {
+ case remote-zeromq-rpc-server {
+ when "/config:modules/config:module/config:type = 'remote-zeromq-rpc-server'";
+
+ container dom-broker {
+ uses config:service-ref {
+ refine type {
+ mandatory true;
+ config:required-identity dom:dom-broker-osgi-registry;
+ }
+ }
+ }
+
+ leaf port {
+ type uint16;
+ }
+ }
+ }
+}
\ No newline at end of file
--- /dev/null
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import junit.framework.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+
+import java.util.concurrent.TimeoutException;
+
+public class ClientTest {
+
+ Client client;
+
+ @Before
+ public void setup(){
+ client = new Client();
+ client.getRequestQueue().clear();
+ }
+
+ @Test
+ public void testStop() throws Exception {
+
+ }
+
+ @Test
+ public void testPool() throws Exception {
+
+ }
+
+ @Test
+ public void process_AddAMessage_ShouldAddToQueue() throws Exception {
+ client.process(getEmptyMessageWrapper());
+ Assert.assertEquals(1, client.getRequestQueue().size());
+ }
+
+ /**
+ * Queue size is 100. Adding 101 message should time out in 2 sec
+ * if server does not process it
+ * @throws Exception
+ */
+ @Test(expected = TimeoutException.class)
+ public void process_Add101Message_ShouldThrow() throws Exception {
+ for (int i=0;i<101;i++){
+ client.process(getEmptyMessageWrapper());
+ }
+ }
+
+ @Test
+ public void testStart() throws Exception {
+ }
+
+ private MessageWrapper getEmptyMessageWrapper(){
+ return new MessageWrapper(new Message(), null);
+ }
+}
--- /dev/null
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import org.codehaus.jackson.JsonParseException;
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+
+public class RouteIdentifierImplTest {
+
+ Logger _logger = LoggerFactory.getLogger(RouteIdentifierImplTest.class);
+
+ private final URI namespace = URI.create("http://cisco.com/example");
+ private final QName QNAME = new QName(namespace, "heartbeat");
+
+ @Test
+ public void testToString() throws Exception {
+ RouteIdentifierImpl rId = new RouteIdentifierImpl();
+ rId.setType(QNAME);
+
+ _logger.debug(rId.toString());
+
+ Assert.assertTrue(true);
+
+ }
+
+ @Test
+ public void testFromString() throws Exception {
+ RouteIdentifierImpl rId = new RouteIdentifierImpl();
+ rId.setType(QNAME);
+
+ _logger.debug("route: " + rId.fromString(rId.toString()));
+
+ Assert.assertTrue(true);
+ }
+
+ @Test(expected = JsonParseException.class)
+ public void testFromInvalidString() throws Exception {
+ String invalidInput = "aklhdgadfa;;;;;;;]]]]=]ag" ;
+ RouteIdentifierImpl rId = new RouteIdentifierImpl();
+ rId.fromString(invalidInput);
+
+ _logger.debug("" + rId);
+ Assert.assertTrue(true);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import junit.framework.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.zeromq.ZMQ;
+
+import java.util.concurrent.TimeoutException;
+
+import static org.mockito.Mockito.doNothing;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(RpcSocket.class)
+public class RpcSocketTest {
+ RpcSocket rpcSocket = new RpcSocket("tcp://localhost:5554", new ZMQ.Poller(1));
+ RpcSocket spy = PowerMockito.spy(rpcSocket);
+
+ @Test
+ public void testCreateSocket() throws Exception {
+ Assert.assertEquals("tcp://localhost:5554", spy.getAddress());
+ Assert.assertEquals(ZMQ.REQ, spy.getSocket().getType());
+ }
+
+ @Test(expected = TimeoutException.class)
+ public void send_WhenQueueGetsFull_ShouldThrow() throws Exception {
+
+ doNothing().when(spy).process();
+
+ //10 is queue size
+ for (int i=0;i<10;i++){
+ spy.send(getEmptyMessageWrapper());
+ }
+
+ //sending 11th message should throw
+ spy.send(getEmptyMessageWrapper());
+ }
+
+ @Test
+ public void testHasTimedOut() throws Exception {
+ spy.send(getEmptyMessageWrapper());
+ Assert.assertFalse(spy.hasTimedOut());
+ Thread.sleep(1000);
+ Assert.assertFalse(spy.hasTimedOut());
+ Thread.sleep(1000);
+ Assert.assertTrue(spy.hasTimedOut());
+ }
+
+ @Test
+ public void testProcess() throws Exception {
+ PowerMockito.doNothing().when(spy, "sendMessage");
+ spy.send(getEmptyMessageWrapper());
+
+ //Next message should get queued
+ spy.send(getEmptyMessageWrapper());
+
+ //queue size should be 2
+ Assert.assertEquals(2, spy.getQueueSize());
+
+
+ spy.process();
+ //sleep for 2 secs (timeout)
+ //message send would be retried
+ Thread.sleep(2000);
+ spy.process();
+ Thread.sleep(2000);
+ spy.process();
+ Thread.sleep(2000);
+ spy.process(); //retry fails, next message will get picked up
+ Assert.assertEquals(1, spy.getQueueSize());
+ }
+
+ @Test
+ public void testProcessStateTransitions() throws Exception {
+ PowerMockito.doNothing().when(spy, "sendMessage");
+ Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState);
+ spy.send(getEmptyMessageWrapper());
+ Assert.assertEquals(1, spy.getQueueSize());
+ Thread.sleep(200);
+ Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState);
+ Thread.sleep(1800);
+
+ //1st timeout, 2nd try
+ spy.process();
+ Thread.sleep(200);
+ Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState);
+ Thread.sleep(1800);
+
+ //2nd timeout, 3rd try
+ spy.process();
+ Thread.sleep(200);
+ Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState);
+ Thread.sleep(1800);
+
+ //3rd timeout, no more tries => remove
+ spy.process();
+ Thread.sleep(200);
+ Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState);
+ Assert.assertEquals(0, spy.getQueueSize());
+ }
+
+ @Test
+ public void testParseMessage() throws Exception {
+ // Write an integration test for parseMessage
+ }
+
+ @Test
+ public void testRecycleSocket() throws Exception {
+ // This will need to be updated in the future...for now, recycleSocket() calls close()
+ Assert.assertTrue(spy.getSocket().base().check_tag());
+ spy.close();
+ Assert.assertEquals(10, spy.getSocket().getLinger());
+ Assert.assertFalse(spy.getSocket().base().check_tag());
+ }
+
+ @Test
+ public void testClose() throws Exception {
+ Assert.assertTrue(spy.getSocket().base().check_tag());
+ spy.close();
+ Assert.assertEquals(10, spy.getSocket().getLinger());
+ Assert.assertFalse(spy.getSocket().base().check_tag());
+ }
+
+ @Test
+ public void testReceive() throws Exception {
+ PowerMockito.doReturn(null).when(spy, "parseMessage");
+ PowerMockito.doNothing().when(spy, "process");
+ spy.send(getEmptyMessageWrapper());
+
+ //There should be 1 message waiting in the queue
+ Assert.assertEquals(1, spy.getQueueSize());
+
+ spy.receive();
+ //This should complete message processing
+ //The message should be removed from the queue
+ Assert.assertEquals(0, spy.getQueueSize());
+ Assert.assertEquals(RpcSocket.NUM_RETRIES, spy.getRetriesLeft());
+
+ }
+
+ @Test
+ public void testReceiveStateTransitions() throws Exception {
+ PowerMockito.doReturn(null).when(spy, "parseMessage");
+ Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState);
+ spy.send(getEmptyMessageWrapper());
+
+ //There should be 1 message waiting in the queue
+ Assert.assertEquals(1, spy.getQueueSize());
+ Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState);
+
+ spy.receive();
+ //This should complete message processing
+ //The message should be removed from the queue
+ Assert.assertEquals(0, spy.getQueueSize());
+ Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState);
+ }
+
+ private MessageWrapper getEmptyMessageWrapper(){
+ return new MessageWrapper(new Message(), null);
+ }
+
+ @Test
+ public void testProcessReceiveSequence() throws Exception {
+ PowerMockito.doNothing().when(spy, "sendMessage");
+ PowerMockito.doReturn(null).when(spy, "parseMessage");
+ Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState);
+ spy.send(getEmptyMessageWrapper());
+ spy.send(getEmptyMessageWrapper());
+ Assert.assertEquals(2, spy.getQueueSize());
+ Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState);
+
+
+ Thread.sleep(2000);
+ Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState);
+ spy.receive();
+ Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState);
+ Assert.assertEquals(1, spy.getQueueSize());
+
+ spy.process();
+ Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState);
+ spy.receive();
+ Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState);
+ Assert.assertEquals(0, spy.getQueueSize());
+ }
+}
--- /dev/null
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import org.junit.Test;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.Node;
+import org.opendaylight.yangtools.yang.data.api.SimpleNode;
+import org.opendaylight.yangtools.yang.data.impl.NodeUtils;
+import org.opendaylight.yangtools.yang.data.impl.XmlTreeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+
+import javax.xml.stream.XMLStreamException;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.io.StringWriter;
+
+public class SerilizationTest {
+
+ private static final Logger _logger = LoggerFactory.getLogger(SerilizationTest.class);
+
+ public void fromXml() {
+ }
+
+ @Test
+ public void toXml() throws FileNotFoundException {
+
+ InputStream xmlStream = SerilizationTest.class.getResourceAsStream("/FourSimpleChildren.xml");
+ StringWriter writer = new StringWriter();
+
+ CompositeNode data = loadCompositeNode(xmlStream);
+ Document domTree = NodeUtils.buildShadowDomTree(data);
+ try {
+ TransformerFactory tf = TransformerFactory.newInstance();
+ Transformer transformer = tf.newTransformer();
+ transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
+ //transformer.setOutputProperty(OutputKeys.METHOD, "xml");
+ //transformer.setOutputProperty(OutputKeys.INDENT, "yes");
+ //transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
+ //transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
+ transformer.transform(new DOMSource(domTree), new StreamResult(writer));
+ } catch (TransformerException e) {
+ _logger.error("Error during translation of Document to OutputStream", e);
+ }
+
+ _logger.info("Parsed xml [{}]", writer.toString());
+ }
+
+ //Note to self: Stolen from TestUtils
+ ///Users/alefan/odl/controller4/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/TestUtils.java
+ // Figure out how to include TestUtils through pom ...was getting errors
+ private CompositeNode loadCompositeNode(InputStream xmlInputStream) throws FileNotFoundException {
+ if (xmlInputStream == null) {
+ throw new IllegalArgumentException();
+ }
+ Node<?> dataTree;
+ try {
+ dataTree = XmlTreeBuilder.buildDataTree(xmlInputStream);
+ } catch (XMLStreamException e) {
+ _logger.error("Error during building data tree from XML", e);
+ return null;
+ }
+ if (dataTree == null) {
+ _logger.error("data tree is null");
+ return null;
+ }
+ if (dataTree instanceof SimpleNode) {
+ _logger.error("RPC XML was resolved as SimpleNode");
+ return null;
+ }
+ return (CompositeNode) dataTree;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import com.google.common.base.Optional;
+import junit.framework.Assert;
+import org.junit.After;
+import org.junit.Before;
+import org.zeromq.ZMQ;
+import org.opendaylight.controller.sal.connector.remoterpc.SocketManager;
+import org.opendaylight.controller.sal.connector.remoterpc.RpcSocket;
+import org.opendaylight.controller.sal.connector.remoterpc.Context;
+import org.junit.Test;
+
+public class SocketManagerTest {
+
+ SocketManager manager;
+
+ @Before
+ public void setup(){
+ manager = new SocketManager();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ manager.close();
+ }
+
+ @Test
+ public void getManagedSockets_When2NewAdded_ShouldContain2() throws Exception {
+
+ //Prepare data
+ manager.getManagedSocket("tcp://localhost:5554");
+ manager.getManagedSocket("tcp://localhost:5555");
+
+ Assert.assertTrue( 2 == manager.getManagedSockets().size());
+ }
+
+ @Test
+ public void getManagedSockets_When2NewAddedAnd1Existing_ShouldContain2() throws Exception {
+
+ //Prepare data
+ manager.getManagedSocket("tcp://localhost:5554");
+ manager.getManagedSocket("tcp://localhost:5555");
+ manager.getManagedSocket("tcp://localhost:5554"); //ask for the first one
+
+ Assert.assertTrue( 2 == manager.getManagedSockets().size());
+ }
+
+ @Test
+ public void getManagedSocket_WhenPassedAValidAddress_ShouldReturnARpcSocket() throws Exception {
+ String testAddress = "tcp://localhost:5554";
+ RpcSocket rpcSocket = manager.getManagedSocket(testAddress);
+ Assert.assertEquals(testAddress, rpcSocket.getAddress());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void getManagedSocket_WhenPassedInvalidHostAddress_ShouldThrow() throws Exception {
+ String testAddress = "tcp://nonexistenthost:5554";
+ RpcSocket rpcSocket = manager.getManagedSocket(testAddress);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void getManagedSocket_WhenPassedInvalidAddress_ShouldThrow() throws Exception {
+ String testAddress = "xxx";
+ RpcSocket rpcSocket = manager.getManagedSocket(testAddress);
+ }
+
+ @Test
+ public void getManagedSocket_WhenPassedAValidZmqSocket_ShouldReturnARpcSocket() throws Exception {
+ //Prepare data
+ String firstAddress = "tcp://localhost:5554";
+ RpcSocket firstRpcSocket = manager.getManagedSocket(firstAddress);
+ ZMQ.Socket firstZmqSocket = firstRpcSocket.getSocket();
+
+ String secondAddress = "tcp://localhost:5555";
+ RpcSocket secondRpcSocket = manager.getManagedSocket(secondAddress);
+ ZMQ.Socket secondZmqSocket = secondRpcSocket.getSocket();
+
+ Assert.assertEquals(firstRpcSocket, manager.getManagedSocketFor(firstZmqSocket).get());
+ Assert.assertEquals(secondRpcSocket, manager.getManagedSocketFor(secondZmqSocket).get());
+ }
+
+ @Test
+ public void getManagedSocket_WhenPassedNonManagedZmqSocket_ShouldReturnNone() throws Exception {
+ ZMQ.Socket nonManagedSocket = Context.getInstance().getZmqContext().socket(ZMQ.REQ);
+ nonManagedSocket.connect("tcp://localhost:5000");
+
+ //Prepare data
+ String firstAddress = "tcp://localhost:5554";
+ RpcSocket firstRpcSocket = manager.getManagedSocket(firstAddress);
+ ZMQ.Socket firstZmqSocket = firstRpcSocket.getSocket();
+
+ Assert.assertSame(Optional.<RpcSocket>absent(), manager.getManagedSocketFor(nonManagedSocket) );
+ Assert.assertSame(Optional.<RpcSocket>absent(), manager.getManagedSocketFor(null) );
+ }
+
+ @Test
+ public void stop_WhenCalled_ShouldEmptyManagedSockets() throws Exception {
+ manager.getManagedSocket("tcp://localhost:5554");
+ manager.getManagedSocket("tcp://localhost:5555");
+ Assert.assertTrue( 2 == manager.getManagedSockets().size());
+
+ manager.close();
+ Assert.assertTrue( 0 == manager.getManagedSockets().size());
+ }
+
+ @Test
+ public void poller_WhenCalled_ShouldReturnAnInstanceOfPoller() throws Exception {
+ Assert.assertTrue (manager.getPoller() instanceof ZMQ.Poller);
+ }
+
+}
--- /dev/null
+<rpc>
+ <name>eth0</name>
+ <type>ethernetCsmacd</type>
+ <enabled>false</enabled>
+ <description>some interface</description>
+</rpc>
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
- <artifactId>sal-test-parent</artifactId>
+ <artifactId>sal-remoterpc-connector-test-parent</artifactId>
<groupId>org.opendaylight.controller.tests</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
- <artifactId>zeromq-test-consumer</artifactId>
+ <artifactId>sal-remoterpc-connector-test-consumer</artifactId>
<packaging>bundle</packaging>
<scm>
<connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
<configuration>
<instructions>
<Bundle-Activator>org.opendaylight.controller.sample.zeromq.consumer.ExampleConsumer</Bundle-Activator>
- <Import-Package>
- org.opendaylight.controller.sal.core.api,
- org.opendaylight.yangtools.yang.common;version="[0.5,1)",
- org.opendaylight.yangtools.yang.data.api,
- </Import-Package>
</instructions>
</configuration>
</plugin>
<groupId>org.opendaylight.yangtools</groupId>
<artifactId>yang-data-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-data-impl</artifactId>
+ <version>0.5.9-SNAPSHOT</version>
+ </dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
--- /dev/null
+package org.opendaylight.controller.sample.zeromq.consumer;
+
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Hashtable;
+import java.util.concurrent.*;
+
+import org.opendaylight.controller.sal.core.api.AbstractConsumer;
+import org.opendaylight.controller.sal.core.api.Broker.ConsumerSession;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.Node;
+import org.opendaylight.yangtools.yang.data.api.SimpleNode;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.opendaylight.yangtools.yang.data.impl.XmlTreeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
+
+import javax.xml.stream.XMLStreamException;
+
+public class ExampleConsumer extends AbstractConsumer {
+
+ private final URI namespace = URI.create("http://cisco.com/example");
+ private final QName QNAME = new QName(namespace, "heartbeat");
+
+ private ConsumerSession session;
+
+ private ServiceRegistration<ExampleConsumer> thisReg;
+ private Logger _logger = LoggerFactory.getLogger(ExampleConsumer.class);
+
+ @Override
+ public void onSessionInitiated(ConsumerSession session) {
+ this.session = session;
+ }
+
+ public RpcResult<CompositeNode> invokeRpc(QName qname, CompositeNode input) {
+ _logger.info("Invoking RPC:[{}] with Input:[{}]", qname.getLocalName(), input);
+ RpcResult<CompositeNode> result = null;
+ Future<RpcResult<CompositeNode>> future = ExampleConsumer.this.session.rpc(qname, input);
+ try {
+ result = future.get();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ _logger.info("Returning Result:[{}]", result);
+ return result;
+ }
+
+ @Override
+ protected void startImpl(BundleContext context){
+ thisReg = context.registerService(ExampleConsumer.class, this, new Hashtable<String,String>());
+ }
+ @Override
+ protected void stopImpl(BundleContext context) {
+ super.stopImpl(context);
+ thisReg.unregister();
+ }
+
+ public CompositeNode getValidCompositeNodeWithOneSimpleChild() throws FileNotFoundException {
+ InputStream xmlStream = ExampleConsumer.class.getResourceAsStream("/OneSimpleChild.xml");
+ return loadCompositeNode(xmlStream);
+ }
+
+ public CompositeNode getValidCompositeNodeWithTwoSimpleChildren() throws FileNotFoundException {
+ InputStream xmlStream = ExampleConsumer.class.getResourceAsStream("/TwoSimpleChildren.xml");
+ return loadCompositeNode(xmlStream);
+ }
+
+ public CompositeNode getValidCompositeNodeWithFourSimpleChildren() throws FileNotFoundException {
+ InputStream xmlStream = ExampleConsumer.class.getResourceAsStream("/FourSimpleChildren.xml");
+ return loadCompositeNode(xmlStream);
+ }
+
+ public CompositeNode getValidCompositeNodeWithOneSimpleOneCompositeChild() throws FileNotFoundException {
+ InputStream xmlStream = ExampleConsumer.class.getResourceAsStream("/OneSimpleOneCompositeChild.xml");
+ return loadCompositeNode(xmlStream);
+ }
+
+ public CompositeNode getValidCompositeNodeWithTwoCompositeChildren() throws FileNotFoundException {
+ InputStream xmlStream = ExampleConsumer.class.getResourceAsStream("/TwoCompositeChildren.xml");
+ return loadCompositeNode(xmlStream);
+ }
+
+ public CompositeNode getInvalidCompositeNodeSimpleChild() throws FileNotFoundException {
+ InputStream xmlStream = ExampleConsumer.class.getResourceAsStream("/InvalidSimpleChild.xml");
+ return loadCompositeNode(xmlStream);
+ }
+
+ public CompositeNode getInvalidCompositeNodeCompositeChild() throws FileNotFoundException {
+ InputStream xmlStream = ExampleConsumer.class.getResourceAsStream("/InvalidCompositeChild.xml");
+ return loadCompositeNode(xmlStream);
+ }
+
+ //Note to self: Stolen from TestUtils
+ ///Users/alefan/odl/controller4/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/TestUtils.java
+ // Figure out how to include TestUtils through pom ...was getting errors
+ private CompositeNode loadCompositeNode(InputStream xmlInputStream) throws FileNotFoundException {
+ if (xmlInputStream == null) {
+ throw new IllegalArgumentException();
+ }
+ Node<?> dataTree;
+ try {
+ dataTree = XmlTreeBuilder.buildDataTree(xmlInputStream);
+ } catch (XMLStreamException e) {
+ _logger.error("Error during building data tree from XML", e);
+ return null;
+ }
+ if (dataTree == null) {
+ _logger.error("data tree is null");
+ return null;
+ }
+ if (dataTree instanceof SimpleNode) {
+ _logger.error("RPC XML was resolved as SimpleNode");
+ return null;
+ }
+ return (CompositeNode) dataTree;
+ }
+}
--- /dev/null
+<rpc>
+ <name>eth0</name>
+ <type>ethernetCsmacd</type>
+ <enabled>false</enabled>
+ <description>some interface</description>
+</rpc>
--- /dev/null
+<rpc>
+ <innerinterface1>
+ <name>eth1</name>
+ <type>ethernet</type>
+ <enabled>false</enabled>
+ <description>some interface</description>
+ </innerinterface1>
+ <innerinterface2>
+ <name>error</name>
+ <type>ethernet</type>
+ <enabled>true</enabled>
+ <description>some interface</description>
+ </innerinterface2>
+</rpc>
--- /dev/null
+<rpc>
+ <name>error</name>
+</rpc>
--- /dev/null
+<rpc>
+ <name>eth0</name>
+</rpc>
--- /dev/null
+<rpc>
+ <name>eth0</name>
+ <innerinterface>
+ <name>eth1</name>
+ <type>ethernetCsmacd</type>
+ <enabled>false</enabled>
+ <description>some interface</description>
+ </innerinterface>
+</rpc>
--- /dev/null
+<rpc>
+ <innerinterface1>
+ <name>eth1</name>
+ <type>ethernet</type>
+ <enabled>false</enabled>
+ <description>some interface</description>
+ </innerinterface1>
+ <innerinterface2>
+ <name>eth2</name>
+ <type>ethernet</type>
+ <enabled>true</enabled>
+ <description>some interface</description>
+ </innerinterface2>
+</rpc>
--- /dev/null
+<rpc>
+ <name>eth0</name>
+ <type>ethernetCsmacd</type>
+</rpc>
\ No newline at end of file
--- /dev/null
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-parent</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <relativePath>../..</relativePath>
+ </parent>
+ <packaging>pom</packaging>
+ <groupId>org.opendaylight.controller.tests</groupId>
+ <artifactId>sal-remoterpc-connector-test-parent</artifactId>
+ <scm>
+ <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
+ <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
+ <url>https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL</url>
+ </scm>
+
+ <modules>
+ <module>consumer-service</module>
+ <module>provider-service</module>
+ <module>test-it</module>
+ <module>test-nb</module>
+ </modules>
+
+</project>
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
- <artifactId>sal-test-parent</artifactId>
+ <artifactId>sal-remoterpc-connector-test-parent</artifactId>
<groupId>org.opendaylight.controller.tests</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
- <artifactId>zeromq-test-provider</artifactId>
+ <artifactId>sal-remoterpc-connector-test-provider</artifactId>
<packaging>bundle</packaging>
<scm>
<connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
<groupId>org.opendaylight.yangtools</groupId>
<artifactId>yang-data-api</artifactId>
</dependency>
-
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-data-impl</artifactId>
+ </dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>sal-common-util</artifactId>
</dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
- <artifactId>sal-zeromq-connector</artifactId>
+ <artifactId>sal-remoterpc-connector</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
--- /dev/null
+package org.opendaylight.controller.sample.zeromq.provider;
+
+import org.opendaylight.controller.sal.common.util.RpcErrors;
+import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.CompositeNodeImpl;
+import org.opendaylight.controller.sal.core.api.AbstractProvider;
+import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.Node;
+import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
+import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.*;
+
+public class ExampleProvider extends AbstractProvider implements RpcImplementation {
+
+ private final URI namespace = URI.create("http://cisco.com/example");
+ private final QName QNAME = new QName(namespace, "heartbeat");
+ private RpcRegistration reg;
+
+ private ServiceRegistration thisReg;
+
+ private ProviderSession session;
+ private Logger _logger = LoggerFactory.getLogger(ExampleProvider.class);
+
+ @Override
+ public void onSessionInitiated(ProviderSession session) {
+ this.session = session;
+ }
+
+ @Override
+ public Set<QName> getSupportedRpcs() {
+ Set<QName> supportedRpcs = new HashSet<QName>();
+ supportedRpcs.add(QNAME);
+ return supportedRpcs;
+ }
+
+ @Override
+ public RpcResult<CompositeNode> invokeRpc(final QName rpc, CompositeNode input) {
+ boolean success = false;
+ CompositeNode output = null;
+ Collection<RpcError> errors = new ArrayList<>();
+
+ // Only handle supported RPC calls
+ if (getSupportedRpcs().contains(rpc)) {
+ if (input == null) {
+ errors.add(RpcErrors.getRpcError("app", "tag", "info", RpcError.ErrorSeverity.WARNING, "message:null input", RpcError.ErrorType.RPC, null));
+ }
+ else {
+ if (isErroneousInput(input)) {
+ errors.add(RpcErrors.getRpcError("app", "tag", "info", RpcError.ErrorSeverity.ERROR, "message:error", RpcError.ErrorType.RPC, null));
+ }
+ else {
+ success = true;
+ output = addSuccessNode(input);
+ }
+ }
+ }
+ return Rpcs.getRpcResult(success, output, errors);
+ }
+
+ // Examines input -- dives into CompositeNodes and finds any value equal to "error"
+ private boolean isErroneousInput(CompositeNode input) {
+ for (Node<?> n : input.getChildren()) {
+ if (n instanceof CompositeNode) {
+ if (isErroneousInput((CompositeNode)n)) {
+ return true;
+ }
+ }
+ else { //SimpleNode
+ if ((input.getChildren().get(0).getValue()).equals("error")) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ // Adds a child SimpleNode containing the value "success" to the input CompositeNode
+ private CompositeNode addSuccessNode(CompositeNode input) {
+ List<Node<?>> list = new ArrayList<Node<?>>(input.getChildren());
+ SimpleNodeTOImpl<String> simpleNode = new SimpleNodeTOImpl<String>(QNAME, input, "success");
+ list.add(simpleNode);
+ return new CompositeNodeTOImpl(QNAME, null, list);
+ }
+
+ @Override
+ protected void startImpl(BundleContext context) {
+ thisReg = context.registerService(ExampleProvider.class, this, new Hashtable<String, String>());
+ }
+
+ @Override
+ protected void stopImpl(BundleContext context) {
+ if (reg != null) {
+ try {
+ reg.close();
+ thisReg.unregister();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public void announce(QName name) {
+ _logger.debug("Announcing [{}]\n\n\n", name);
+ reg = this.session.addRpcImplementation(name, this);
+ }
+
+}
--- /dev/null
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>sal-remoterpc-connector-test-parent</artifactId>
+ <groupId>org.opendaylight.controller.tests</groupId>
+ <version>1.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>sal-remoterpc-connector-test-it</artifactId>
+ <scm>
+ <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
+ <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
+ <url>https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL</url>
+ </scm>
+
+ <properties>
+ <exam.version>3.0.0</exam.version>
+ <url.version>1.5.0</url.version>
+ <config.version>0.2.3-SNAPSHOT</config.version>
+ <netconf.version>0.2.3-SNAPSHOT</netconf.version>
+ </properties>
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <version>1.7</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.ops4j.pax.exam</groupId>
+ <artifactId>maven-paxexam-plugin</artifactId>
+ <version>1.2.4</version>
+ <executions>
+ <execution>
+ <id>generate-config</id>
+ <goals>
+ <goal>generate-depends-file</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ <pluginManagement>
+ <plugins>
+ <!--This plugin's configuration is used to store Eclipse
+ m2e settings only. It has no influence on the Maven build itself. -->
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>
+ org.ops4j.pax.exam
+ </groupId>
+ <artifactId>
+ maven-paxexam-plugin
+ </artifactId>
+ <versionRange>
+ [1.2.4,)
+ </versionRange>
+ <goals>
+ <goal>
+ generate-depends-file
+ </goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore></ignore>
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.opendaylight.yangtools.thirdparty</groupId>
+ <artifactId>xtend-lib-osgi</artifactId>
+ <version>2.4.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller.tests</groupId>
+ <artifactId>sal-remoterpc-connector-test-provider</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller.tests</groupId>
+ <artifactId>sal-remoterpc-connector-test-consumer</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-broker-impl</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.ops4j.pax.exam</groupId>
+ <artifactId>pax-exam-container-native</artifactId>
+ <version>${exam.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.ops4j.pax.exam</groupId>
+ <artifactId>pax-exam-junit4</artifactId>
+ <version>${exam.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.ops4j.pax.exam</groupId>
+ <artifactId>pax-exam-link-mvn</artifactId>
+ <version>${exam.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.ops4j.pax.url</groupId>
+ <artifactId>pax-url-aether</artifactId>
+ <version>1.5.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>equinoxSDK381</groupId>
+ <artifactId>org.eclipse.osgi</artifactId>
+ <version>3.8.1.v20120830-144521</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <version>1.7.2</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ <version>1.0.9</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>1.0.9</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-binding-api</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-common-util</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-core-api</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-remoterpc-connector</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>containermanager</artifactId>
+ <version>0.5.1-SNAPSHOT</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.compendium</artifactId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-io</artifactId>
+ <groupId>commons-io</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-binding</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-data-api</artifactId>
+ </dependency>
+ <!--dependency> <groupId>org.opendaylight.yangtools</groupId> <artifactId>yang-data-impl</artifactId>
+ <version>0.5.9-SNAPSHOT</version> </dependency -->
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-parser-impl</artifactId>
+ <version>0.5.9-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-common-util</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools.thirdparty</groupId>
+ <artifactId>antlr4-runtime-osgi-nohead</artifactId>
+ <version>4.0</version>
+ </dependency>
+
+ <!-- routing table dependencies -->
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>zeromq-routingtable.implementation</artifactId>
+ <version>0.4.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>clustering.services</artifactId>
+ <version>0.4.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal</artifactId>
+ <version>0.5.1-SNAPSHOT</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.compendium</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal.implementation</artifactId>
+ <version>0.4.0-SNAPSHOT</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>commons-io</artifactId>
+ <groupId>commons-io</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>containermanager</artifactId>
+ <version>0.5.0-SNAPSHOT</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.compendium</artifactId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-io</artifactId>
+ <groupId>commons-io</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>containermanager.it.implementation</artifactId>
+ <version>0.5.0-SNAPSHOT</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>commons-io</artifactId>
+ <groupId>commons-io</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>clustering.stub</artifactId>
+ <version>0.4.0-SNAPSHOT</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>commons-io</artifactId>
+ <groupId>commons-io</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>org.apache.felix.dependencymanager.shell</artifactId>
+ <version>3.0.1</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.compendium</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>eclipselink</groupId>
+ <artifactId>javax.resource</artifactId>
+ <version>1.5.0.v200906010428</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal</artifactId>
+ <version>0.5.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>ietf-netconf-monitoring</artifactId>
+ <version>${netconf.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-binding</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools.model</groupId>
+ <artifactId>yang-ext</artifactId>
+ <version>2013.09.07.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools.model</groupId>
+ <artifactId>opendaylight-l2-types</artifactId>
+ <version>2013.08.27.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-binding-it</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-binding-config</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-binding-broker-impl</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-broker-impl</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.opendaylight.controller.model</groupId>
+ <artifactId>model-inventory</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-connector-api</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-common-util</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>clustering.services</artifactId>
+ <version>0.4.1-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>equinoxSDK381</groupId>
+ <artifactId>org.eclipse.osgi</artifactId>
+ <version>3.8.1.v20120830-144521</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ <version>1.9.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-asl</artifactId>
+ <version>1.9.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.zeromq</groupId>
+ <artifactId>jeromq</artifactId>
+ <version>0.3.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools.thirdparty</groupId>
+ <artifactId>xtend-lib-osgi</artifactId>
+ <version>2.4.3</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-binding-broker-impl</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.ops4j.pax.exam</groupId>
+ <artifactId>pax-exam-container-native</artifactId>
+ <version>${exam.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.ops4j.pax.exam</groupId>
+ <artifactId>pax-exam-junit4</artifactId>
+ <version>${exam.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>config-netconf-connector</artifactId>
+ <version>${netconf.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>yang-store-impl</artifactId>
+ <version>${config.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>logback-config</artifactId>
+ <version>${config.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>config-persister-impl</artifactId>
+ <version>${config.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>config-persister-file-adapter</artifactId>
+ <version>${config.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>netconf-impl</artifactId>
+ <version>${netconf.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>netconf-client</artifactId>
+ <version>${netconf.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.ops4j.pax.exam</groupId>
+ <artifactId>pax-exam</artifactId>
+ <version>${exam.version}</version>
+ <!-- Compile scope here is intentional, it is used in TestHelper
+ class which could be downloaded via nexus and reused in other integration
+ tests. -->
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.ops4j.pax.exam</groupId>
+ <artifactId>pax-exam-link-mvn</artifactId>
+ <version>${exam.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>equinoxSDK381</groupId>
+ <artifactId>org.eclipse.osgi</artifactId>
+ <version>3.8.1.v20120830-144521</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <version>1.7.2</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ <version>1.0.9</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>1.0.9</version>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller.model</groupId>
+ <artifactId>model-flow-service</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>config-manager</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>commons-io</artifactId>
+ <groupId>commons-io</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller.model</groupId>
+ <artifactId>model-flow-management</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools.thirdparty</groupId>
+ <artifactId>antlr4-runtime-osgi-nohead</artifactId>
+ <version>4.0</version>
+ </dependency>
+ </dependencies>
+</project>
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sample.zeromq.test.it;
+
+import junit.framework.Assert;
+
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.opendaylight.controller.sal.connector.remoterpc.Client;
+import org.opendaylight.controller.sal.connector.remoterpc.RemoteRpcClient;
+import org.opendaylight.controller.sal.connector.remoterpc.RemoteRpcServer;
+import org.opendaylight.controller.sal.connector.remoterpc.ServerImpl;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.controller.sample.zeromq.provider.ExampleProvider;
+import org.opendaylight.controller.sample.zeromq.consumer.ExampleConsumer;
+import org.opendaylight.controller.test.sal.binding.it.TestHelper;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.ops4j.pax.exam.Configuration;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.junit.PaxExam;
+import org.ops4j.pax.exam.util.Filter;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleException;
+import org.osgi.framework.ServiceReference;
+import org.osgi.framework.ServiceRegistration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.ZMQ;
+
+import javax.inject.Inject;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Hashtable;
+
+import static org.opendaylight.controller.test.sal.binding.it.TestHelper.baseModelBundles;
+import static org.opendaylight.controller.test.sal.binding.it.TestHelper.bindingAwareSalBundles;
+import static org.opendaylight.controller.test.sal.binding.it.TestHelper.configMinumumBundles;
+import static org.opendaylight.controller.test.sal.binding.it.TestHelper.mdSalCoreBundles;
+import static org.ops4j.pax.exam.CoreOptions.*;
+
+@RunWith(PaxExam.class)
+public class RouterTest {
+
+ private Logger _logger = LoggerFactory.getLogger(RouterTest.class);
+
+ public static final String ODL = "org.opendaylight.controller";
+ public static final String YANG = "org.opendaylight.yangtools";
+ public static final String SAMPLE = "org.opendaylight.controller.tests";
+ private final URI namespace = URI.create("http://cisco.com/example");
+ private final QName QNAME = new QName(namespace, "heartbeat");
+
+
+ @Inject
+ org.osgi.framework.BundleContext ctx;
+
+ @Inject
+ @Filter(timeout=60*1000)
+ Broker broker;
+
+ private ZMQ.Context zmqCtx = ZMQ.context(1);
+ //private Server router;
+ //private ExampleProvider provider;
+
+ //@Test
+ public void testInvokeRpc() throws Exception{
+ //Thread.sleep(1000);
+ //Send announcement
+ ServiceReference providerRef = ctx.getServiceReference(ExampleProvider.class);
+ Assert.assertNotNull(providerRef);
+
+ ExampleProvider provider = (ExampleProvider)ctx.getService(providerRef);
+ Assert.assertNotNull(provider);
+
+ ServiceReference consumerRef = ctx.getServiceReference(ExampleConsumer.class);
+ Assert.assertNotNull(consumerRef);
+ ExampleConsumer consumer = (ExampleConsumer)ctx.getService(consumerRef);
+ Assert.assertNotNull(consumer);
+
+
+ _logger.debug("Provider sends announcement [{}]", "heartbeat");
+ provider.announce(QNAME);
+ ServiceReference routerRef = ctx.getServiceReference(Client.class);
+ Client router = (Client) ctx.getService(routerRef);
+ _logger.debug("Found router[{}]", router);
+ _logger.debug("Invoking RPC [{}]", QNAME);
+ for (int i = 0; i < 3; i++) {
+ RpcResult<CompositeNode> result = router.invokeRpc(QNAME, consumer.getValidCompositeNodeWithOneSimpleChild());
+ _logger.debug("{}-> Result is: Successful:[{}], Payload:[{}], Errors: [{}]", i, result.isSuccessful(), result.getResult(), result.getErrors());
+ Assert.assertNotNull(result);
+ }
+ }
+
+ @Test
+ public void testInvokeRpcWithValidSimpleNode() throws Exception{
+ //Thread.sleep(1500);
+
+ ServiceReference providerRef = ctx.getServiceReference(ExampleProvider.class);
+ Assert.assertNotNull(providerRef);
+ ExampleProvider provider = (ExampleProvider)ctx.getService(providerRef);
+ Assert.assertNotNull(provider);
+ ServiceReference consumerRef = ctx.getServiceReference(ExampleConsumer.class);
+ Assert.assertNotNull(consumerRef);
+ ExampleConsumer consumer = (ExampleConsumer)ctx.getService(consumerRef);
+ Assert.assertNotNull(consumer);
+
+ // Provider sends announcement
+ _logger.debug("Provider sends announcement [{}]", "heartbeat");
+ provider.announce(QNAME);
+ // Consumer invokes RPC
+ _logger.debug("Invoking RPC [{}]", QNAME);
+ CompositeNode input = consumer.getValidCompositeNodeWithOneSimpleChild();
+ for (int i = 0; i < 3; i++) {
+ RpcResult<CompositeNode> result = consumer.invokeRpc(QNAME, input);
+ Assert.assertNotNull(result);
+ _logger.debug("{}-> Result is: Successful:[{}], Payload:[{}], Errors: [{}]", i, result.isSuccessful(), result.getResult(), result.getErrors());
+ Assert.assertTrue(result.isSuccessful());
+ Assert.assertNotNull(result.getResult());
+ Assert.assertEquals(0, result.getErrors().size());
+ Assert.assertEquals(input.getChildren().size()+1, result.getResult().getChildren().size());
+ }
+ }
+
+ @Test
+ public void testInvokeRpcWithValidSimpleNodes() throws Exception{
+ //Thread.sleep(1500);
+
+ ServiceReference providerRef = ctx.getServiceReference(ExampleProvider.class);
+ Assert.assertNotNull(providerRef);
+ ExampleProvider provider = (ExampleProvider)ctx.getService(providerRef);
+ Assert.assertNotNull(provider);
+ ServiceReference consumerRef = ctx.getServiceReference(ExampleConsumer.class);
+ Assert.assertNotNull(consumerRef);
+ ExampleConsumer consumer = (ExampleConsumer)ctx.getService(consumerRef);
+ Assert.assertNotNull(consumer);
+
+ // Provider sends announcement
+ _logger.debug("Provider sends announcement [{}]", "heartbeat");
+ provider.announce(QNAME);
+ // Consumer invokes RPC
+ _logger.debug("Invoking RPC [{}]", QNAME);
+ CompositeNode input = consumer.getValidCompositeNodeWithFourSimpleChildren();
+ for (int i = 0; i < 3; i++) {
+ RpcResult<CompositeNode> result = consumer.invokeRpc(QNAME, input);
+ Assert.assertNotNull(result);
+ _logger.debug("{}-> Result is: Successful:[{}], Payload:[{}], Errors: [{}]", i, result.isSuccessful(), result.getResult(), result.getErrors());
+ Assert.assertTrue(result.isSuccessful());
+ Assert.assertNotNull(result.getResult());
+ Assert.assertEquals(0, result.getErrors().size());
+ Assert.assertEquals(input.getChildren().size()+1, result.getResult().getChildren().size());
+ }
+ }
+
+ @Test
+ public void testInvokeRpcWithValidCompositeNode() throws Exception{
+ //Thread.sleep(1500);
+
+ ServiceReference providerRef = ctx.getServiceReference(ExampleProvider.class);
+ Assert.assertNotNull(providerRef);
+ ExampleProvider provider = (ExampleProvider)ctx.getService(providerRef);
+ Assert.assertNotNull(provider);
+ ServiceReference consumerRef = ctx.getServiceReference(ExampleConsumer.class);
+ Assert.assertNotNull(consumerRef);
+ ExampleConsumer consumer = (ExampleConsumer)ctx.getService(consumerRef);
+ Assert.assertNotNull(consumer);
+
+ // Provider sends announcement
+ _logger.debug("Provider sends announcement [{}]", "heartbeat");
+ provider.announce(QNAME);
+ // Consumer invokes RPC
+ _logger.debug("Invoking RPC [{}]", QNAME);
+ CompositeNode input = consumer.getValidCompositeNodeWithTwoCompositeChildren();
+ for (int i = 0; i < 3; i++) {
+ RpcResult<CompositeNode> result = consumer.invokeRpc(QNAME, input);
+ Assert.assertNotNull(result);
+ _logger.debug("{}-> Result is: Successful:[{}], Payload:[{}], Errors: [{}]", i, result.isSuccessful(), result.getResult(), result.getErrors());
+ Assert.assertTrue(result.isSuccessful());
+ Assert.assertNotNull(result.getResult());
+ Assert.assertEquals(0, result.getErrors().size());
+ Assert.assertEquals(input.getChildren().size()+1, result.getResult().getChildren().size());
+ }
+ }
+
+ @Test
+ public void testInvokeRpcWithNullInput() throws Exception{
+ //Thread.sleep(1500);
+
+ ServiceReference providerRef = ctx.getServiceReference(ExampleProvider.class);
+ Assert.assertNotNull(providerRef);
+ ExampleProvider provider = (ExampleProvider)ctx.getService(providerRef);
+ Assert.assertNotNull(provider);
+ ServiceReference consumerRef = ctx.getServiceReference(ExampleConsumer.class);
+ Assert.assertNotNull(consumerRef);
+ ExampleConsumer consumer = (ExampleConsumer)ctx.getService(consumerRef);
+ Assert.assertNotNull(consumer);
+
+ // Provider sends announcement
+ _logger.debug("Provider sends announcement [{}]", QNAME.getLocalName());
+ provider.announce(QNAME);
+ // Consumer invokes RPC
+ _logger.debug("Invoking RPC [{}]", QNAME);
+ for (int i = 0; i < 3; i++) {
+ RpcResult<CompositeNode> result = consumer.invokeRpc(QNAME, null);
+ Assert.assertNotNull(result);
+ _logger.debug("{}-> Result is: Successful:[{}], Payload:[{}], Errors: [{}]", i, result.isSuccessful(), result.getResult(), result.getErrors());
+ Assert.assertFalse(result.isSuccessful());
+ Assert.assertNull(result.getResult());
+ Assert.assertEquals(1, result.getErrors().size());
+ Assert.assertEquals(RpcError.ErrorSeverity.WARNING, ((RpcError)result.getErrors().toArray()[0]).getSeverity());
+ }
+ }
+
+ @Test
+ public void testInvokeRpcWithInvalidSimpleNode() throws Exception{
+ //Thread.sleep(1500);
+
+ ServiceReference providerRef = ctx.getServiceReference(ExampleProvider.class);
+ Assert.assertNotNull(providerRef);
+ ExampleProvider provider = (ExampleProvider)ctx.getService(providerRef);
+ Assert.assertNotNull(provider);
+ ServiceReference consumerRef = ctx.getServiceReference(ExampleConsumer.class);
+ Assert.assertNotNull(consumerRef);
+ ExampleConsumer consumer = (ExampleConsumer)ctx.getService(consumerRef);
+ Assert.assertNotNull(consumer);
+
+ // Provider sends announcement
+ _logger.debug("Provider sends announcement [{}]", QNAME.getLocalName());
+ provider.announce(QNAME);
+ // Consumer invokes RPC
+ _logger.debug("Invoking RPC [{}]", QNAME);
+ CompositeNode input = consumer.getInvalidCompositeNodeSimpleChild();
+ for (int i = 0; i < 3; i++) {
+ RpcResult<CompositeNode> result = consumer.invokeRpc(QNAME, input);
+ Assert.assertNotNull(result);
+ _logger.debug("{}-> Result is: Successful:[{}], Payload:[{}], Errors: [{}]", i, result.isSuccessful(), result.getResult(), result.getErrors());
+ Assert.assertFalse(result.isSuccessful());
+ Assert.assertNull(result.getResult());
+ Assert.assertEquals(1, result.getErrors().size());
+ Assert.assertEquals(RpcError.ErrorSeverity.ERROR, ((RpcError)result.getErrors().toArray()[0]).getSeverity());
+ }
+ }
+
+ @Test
+ public void testInvokeRpcWithInvalidCompositeNode() throws Exception{
+ //Thread.sleep(1500);
+
+ ServiceReference providerRef = ctx.getServiceReference(ExampleProvider.class);
+ Assert.assertNotNull(providerRef);
+ ExampleProvider provider = (ExampleProvider)ctx.getService(providerRef);
+ Assert.assertNotNull(provider);
+ ServiceReference consumerRef = ctx.getServiceReference(ExampleConsumer.class);
+ Assert.assertNotNull(consumerRef);
+ ExampleConsumer consumer = (ExampleConsumer)ctx.getService(consumerRef);
+ Assert.assertNotNull(consumer);
+
+ // Provider sends announcement
+ _logger.debug("Provider sends announcement [{}]", QNAME.getLocalName());
+ provider.announce(QNAME);
+ // Consumer invokes RPC
+ _logger.debug("Invoking RPC [{}]", QNAME);
+ CompositeNode input = consumer.getInvalidCompositeNodeCompositeChild();
+ for (int i = 0; i < 3; i++) {
+ RpcResult<CompositeNode> result = consumer.invokeRpc(QNAME, input);
+ Assert.assertNotNull(result);
+ _logger.debug("{}-> Result is: Successful:[{}], Payload:[{}], Errors: [{}]", i, result.isSuccessful(), result.getResult(), result.getErrors());
+ Assert.assertFalse(result.isSuccessful());
+ Assert.assertNull(result.getResult());
+ Assert.assertEquals(1, result.getErrors().size());
+ Assert.assertEquals(RpcError.ErrorSeverity.ERROR, ((RpcError)result.getErrors().toArray()[0]).getSeverity());
+ }
+ }
+
+ //@Test
+ // This method is UNTESTED -- need to get around the bundling issues before I know if this even work
+// public void testInvokeRpcWithValidCompositeNode() throws Exception{
+// Thread.sleep(10000);
+// //Send announcement
+// ServiceReference providerRef = ctx.getServiceReference(ExampleProvider.class);
+// Assert.assertNotNull(providerRef);
+//
+// ExampleProvider provider = (ExampleProvider)ctx.getService(providerRef);
+// Assert.assertNotNull(provider);
+//
+// ServiceReference consumerRef = ctx.getServiceReference(ExampleConsumer.class);
+// Assert.assertNotNull(consumerRef);
+//
+// ExampleConsumer consumer = (ExampleConsumer)ctx.getService(consumerRef);
+// Assert.assertNotNull(consumer);
+//
+// _logger.debug("Provider sends announcement [{}]", "heartbeat");
+// provider.announce(QNAME);
+// ServiceReference routerRef = ctx.getServiceReference(Client.class);
+// Client router = (Client) ctx.getService(routerRef);
+// _logger.debug("Found router[{}]", router);
+// _logger.debug("Invoking RPC [{}]", QNAME);
+// for (int i = 0; i < 3; i++) {
+// RpcResult<CompositeNode> result = router.getInstance().invokeRpc(QNAME, consumer.getValidCompositeNodeWithOneSimpleChild());
+// _logger.debug("{}-> Result is: Successful:[{}], Payload:[{}], Errors: [{}]", i, result.isSuccessful(), result.getResult(), result.getErrors());
+// Assert.assertNotNull(result);
+// }
+// }
+
+ private Message send(Message msg) throws IOException {
+ ZMQ.Socket reqSocket = zmqCtx.socket(ZMQ.REQ);
+ reqSocket.connect("tcp://localhost:5555");
+ reqSocket.send(Message.serialize(msg));
+ Message response = parseMessage(reqSocket);
+
+ return response;
+ }
+
+ /**
+ * @param socket
+ * @return
+ */
+ private Message parseMessage(ZMQ.Socket socket) {
+
+ Message msg = null;
+ try {
+ byte[] bytes = socket.recv();
+ _logger.debug("Received bytes:[{}]", bytes.length);
+ msg = (Message) Message.deserialize(bytes);
+ } catch (Throwable t) {
+ t.printStackTrace();
+ }
+ return msg;
+ }
+
+
+ private void printState(){
+ Bundle[] b = ctx.getBundles();
+ _logger.debug("\n\nNumber of bundles [{}]\n\n]", b.length);
+ for (int i=0;i<b.length;i++){
+ _logger.debug("Bundle States {}-{} ",b[i].getSymbolicName(), stateToString(b[i].getState()));
+
+ if ( Bundle.INSTALLED == b[i].getState() || (Bundle.RESOLVED == b[i].getState())){
+ try {
+ b[i].start();
+ } catch (BundleException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+ private String stateToString(int state) {
+ switch (state) {
+ case Bundle.ACTIVE:
+ return "ACTIVE";
+ case Bundle.INSTALLED:
+ return "INSTALLED";
+ case Bundle.RESOLVED:
+ return "RESOLVED";
+ case Bundle.UNINSTALLED:
+ return "UNINSTALLED";
+ default:
+ return "Not CONVERTED";
+ }
+ }
+
+ @Configuration
+ public Option[] config() {
+ return options(systemProperty("osgi.console").value("2401"),
+ systemProperty("rpc.port").value("5555"),
+ mavenBundle("org.slf4j", "slf4j-api").versionAsInProject(), //
+ mavenBundle("org.slf4j", "log4j-over-slf4j").versionAsInProject(), //
+ mavenBundle("ch.qos.logback", "logback-core").versionAsInProject(), //
+ mavenBundle("ch.qos.logback", "logback-classic").versionAsInProject(), //
+
+ //mavenBundle(ODL, "sal-binding-broker-impl").versionAsInProject().update(), //
+ mavenBundle(ODL, "sal-common").versionAsInProject(), //
+ mavenBundle(ODL, "sal-common-api").versionAsInProject(),//
+ mavenBundle(ODL, "sal-common-impl").versionAsInProject(), //
+ mavenBundle(ODL, "sal-common-util").versionAsInProject(), //
+ mavenBundle(ODL, "sal-core-api").versionAsInProject().update(), //
+ mavenBundle(ODL, "sal-broker-impl").versionAsInProject(), //
+ mavenBundle(ODL, "sal-core-spi").versionAsInProject().update(), //
+ mavenBundle(ODL, "sal-connector-api").versionAsInProject(), //
+
+
+
+ baseModelBundles(),
+ bindingAwareSalBundles(),
+ TestHelper.bindingIndependentSalBundles(),
+ TestHelper.configMinumumBundles(),
+ TestHelper.mdSalCoreBundles(),
+
+ //Added the consumer
+ mavenBundle(SAMPLE, "sal-remoterpc-connector-test-consumer").versionAsInProject(), //
+ //**** These two bundles below are NOT successfully resolved -- some of their dependencies must be missing
+ //**** This causes the "Message" error to occur, the class cannot be found
+ mavenBundle(SAMPLE, "sal-remoterpc-connector-test-provider").versionAsInProject(), //
+ mavenBundle(ODL, "sal-remoterpc-connector").versionAsInProject(), //
+
+ mavenBundle(ODL, "zeromq-routingtable.implementation").versionAsInProject(),
+ mavenBundle(YANG, "concepts").versionAsInProject(),
+ mavenBundle(YANG, "yang-binding").versionAsInProject(), //
+ mavenBundle(YANG, "yang-common").versionAsInProject(), //
+ mavenBundle(YANG, "yang-data-api").versionAsInProject(), //
+ mavenBundle(YANG, "yang-data-impl").versionAsInProject(), //
+ mavenBundle(YANG, "yang-model-api").versionAsInProject(), //
+ mavenBundle(YANG, "yang-parser-api").versionAsInProject(), //
+ mavenBundle(YANG, "yang-parser-impl").versionAsInProject(), //
+ mavenBundle(YANG, "yang-model-util").versionAsInProject(), //
+ mavenBundle(YANG + ".thirdparty", "xtend-lib-osgi").versionAsInProject(), //
+ mavenBundle(YANG + ".thirdparty", "antlr4-runtime-osgi-nohead").versionAsInProject(), //
+ mavenBundle("com.google.guava", "guava").versionAsInProject(), //
+ mavenBundle("org.zeromq", "jeromq").versionAsInProject(),
+ mavenBundle("org.codehaus.jackson", "jackson-mapper-asl").versionAsInProject(),
+ mavenBundle("org.codehaus.jackson", "jackson-core-asl").versionAsInProject(),
+ //routingtable dependencies
+ systemPackages("sun.reflect", "sun.reflect.misc", "sun.misc"),
+ // List framework bundles
+ mavenBundle("equinoxSDK381", "org.eclipse.equinox.console").versionAsInProject(),
+ mavenBundle("equinoxSDK381", "org.eclipse.equinox.util").versionAsInProject(),
+ mavenBundle("equinoxSDK381", "org.eclipse.osgi.services").versionAsInProject(),
+ mavenBundle("equinoxSDK381", "org.eclipse.equinox.ds").versionAsInProject(),
+ mavenBundle("equinoxSDK381", "org.apache.felix.gogo.command").versionAsInProject(),
+ mavenBundle("equinoxSDK381", "org.apache.felix.gogo.runtime").versionAsInProject(),
+ mavenBundle("equinoxSDK381", "org.apache.felix.gogo.shell").versionAsInProject(),
+ // List logger bundles
+
+ mavenBundle("org.opendaylight.controller", "clustering.services")
+ .versionAsInProject(),
+ mavenBundle("org.opendaylight.controller", "clustering.stub")
+ .versionAsInProject(),
+
+
+ // List all the bundles on which the test case depends
+ mavenBundle("org.opendaylight.controller", "sal")
+ .versionAsInProject(),
+ mavenBundle("org.opendaylight.controller", "sal.implementation")
+ .versionAsInProject(),
+ mavenBundle("org.jboss.spec.javax.transaction",
+ "jboss-transaction-api_1.1_spec").versionAsInProject(),
+ mavenBundle("org.apache.commons", "commons-lang3")
+ .versionAsInProject(),
+ mavenBundle("org.apache.felix",
+ "org.apache.felix.dependencymanager")
+ .versionAsInProject(),
+
+ junitBundles()
+ );
+ }
+
+}
--- /dev/null
+//START OF CONFIG-LAST
+<data xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
+<modules xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
+ <module>
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:impl">prefix:schema-service-singleton</type>
+ <name>yang-schema-service</name>
+ </module>
+ <module>
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:impl">prefix:hash-map-data-store</type>
+ <name>hash-map-data-store</name>
+ </module>
+ <module>
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:impl">prefix:dom-broker-impl</type>
+ <name>dom-broker</name>
+ <data-store xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:impl">
+ <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:dom-data-store</type>
+ <name>ref_hash-map-data-store</name>
+ </data-store>
+ </module>
+ <module>
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">prefix:binding-broker-impl</type>
+ <name>binding-broker-impl</name>
+ <notification-service xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">
+ <type xmlns:binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">binding:binding-notification-service</type>
+ <name>ref_binding-notification-broker</name>
+ </notification-service>
+ <data-broker xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">
+ <type xmlns:binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">binding:binding-data-broker</type>
+ <name>ref_binding-data-broker</name>
+ </data-broker>
+ </module>
+ <module>
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">prefix:runtime-generated-mapping</type>
+ <name>runtime-mapping-singleton</name>
+ </module>
+ <module>
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">prefix:binding-notification-broker</type>
+ <name>binding-notification-broker</name>
+ </module>
+ <module>
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">prefix:binding-data-broker</type>
+ <name>binding-data-broker</name>
+ <dom-broker xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">
+ <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:dom-broker-osgi-registry</type>
+ <name>ref_dom-broker</name>
+ </dom-broker>
+ <mapping-service xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">
+ <type xmlns:binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">binding:binding-dom-mapping-service</type>
+ <name>ref_runtime-mapping-singleton</name>
+ </mapping-service>
+ </module>
+ <module>
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote:rpc">prefix:remote-zeromq-rpc-server</type>
+ <name>remoter</name>
+ <port xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote:rpc">5666</port>
+ <dom-broker xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote:rpc">
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">prefix:dom-broker-osgi-registry</type>
+ <name>ref_dom-broker</name>
+ </dom-broker>
+ </module>
+</modules>
+<services xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
+ <service>
+ <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:schema-service</type>
+ <instance>
+ <name>ref_yang-schema-service</name>
+ <provider>/config/modules/module[name='schema-service-singleton']/instance[name='yang-schema-service']</provider>
+ </instance>
+ </service>
+ <service>
+ <type xmlns:binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">binding:binding-notification-service</type>
+ <instance>
+ <name>ref_binding-notification-broker</name>
+ <provider>/config/modules/module[name='binding-notification-broker']/instance[name='binding-notification-broker']</provider>
+ </instance>
+ </service>
+ <service>
+ <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:dom-data-store</type>
+ <instance>
+ <name>ref_hash-map-data-store</name>
+ <provider>/config/modules/module[name='hash-map-data-store']/instance[name='hash-map-data-store']</provider>
+ </instance>
+ </service>
+ <service>
+ <type xmlns:binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">binding:binding-broker-osgi-registry</type>
+ <instance>
+ <name>ref_binding-broker-impl</name>
+ <provider>/config/modules/module[name='binding-broker-impl']/instance[name='binding-broker-impl']</provider>
+ </instance>
+ </service>
+ <service>
+ <type xmlns:binding-impl="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">binding-impl:binding-dom-mapping-service</type>
+ <instance>
+ <name>ref_runtime-mapping-singleton</name>
+ <provider>/config/modules/module[name='runtime-generated-mapping']/instance[name='runtime-mapping-singleton']</provider>
+ </instance>
+ </service>
+ <service>
+ <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:dom-broker-osgi-registry</type>
+ <instance>
+ <name>ref_dom-broker</name>
+ <provider>/config/modules/module[name='dom-broker-impl']/instance[name='dom-broker']</provider>
+ </instance>
+ </service>
+ <service>
+ <type xmlns:binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">binding:binding-data-broker</type>
+ <instance>
+ <name>ref_binding-data-broker</name>
+ <provider>/config/modules/module[name='binding-data-broker']/instance[name='binding-data-broker']</provider>
+ </instance>
+ </service>
+</services>
+</data>
+
+
+//END OF SNAPSHOT
+urn:opendaylight:params:xml:ns:yang:controller:config?module=config&revision=2013-04-05
+urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl?module=opendaylight-sal-binding-broker-impl&revision=2013-10-28
+urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding?module=opendaylight-md-sal-binding&revision=2013-10-28
+urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:impl?module=opendaylight-sal-dom-broker-impl&revision=2013-10-28
+urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom?module=opendaylight-md-sal-dom&revision=2013-10-28
+urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote:rpc?module=odl-sal-dom-rpc-remote-cfg&revision=2013-10-28
+//END OF CONFIG
--- /dev/null
+<configuration scan="true">
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
+ </pattern>
+ </encoder>
+ </appender>
+
+
+ <logger name="org.opendaylight.yangtools.yang.parser.util.ModuleDependencySort" level="ERROR"/>
+
+ <root level="info">
+ <appender-ref ref="STDOUT" />
+ </root>
+</configuration>
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>sal-remoterpc-connector-test-parent</artifactId>
+ <groupId>org.opendaylight.controller.tests</groupId>
+ <version>1.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>sal-remoterpc-connector-test-nb</artifactId>
+ <packaging>bundle</packaging>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <version>${bundle.plugin.version}</version>
+ <extensions>true</extensions>
+ <configuration>
+ <instructions>
+ <Export-Package>
+ </Export-Package>
+ <Import-Package>
+ com.sun.jersey.spi.container.servlet,
+ org.codehaus.jackson.annotate,
+ javax.ws.rs,
+ javax.ws.rs.core,
+ javax.xml.bind,
+ javax.xml.bind.annotation,
+ org.slf4j,
+ org.apache.catalina.filters,
+ org.codehaus.jackson.jaxrs,
+ org.opendaylight.controller.sample.zeromq.provider,
+ org.opendaylight.controller.sample.zeromq.consumer,
+ org.opendaylight.controller.sal.utils,
+ org.opendaylight.yangtools.yang.common,
+ org.opendaylight.controller.sal.connector.api,
+ org.opendaylight.controller.sal.connector.remoterpc.api;version="[0.4,1)",
+ org.opendaylight.controller.sal.connector.remoterpc.impl;version="[0.4,1)",
+ org.opendaylight.controller.sal.connector.remoterpc.dto,
+ org.opendaylight.controller.sal.connector.remoterpc.util,
+ org.osgi.framework,
+ com.google.common.base,
+ org.opendaylight.yangtools.yang.data.api,
+ !org.codehaus.enunciate.jaxrs
+
+ </Import-Package>
+ <Web-ContextPath>/controller/nb/v2/zmqnb</Web-ContextPath>
+ <Jaxrs-Resources>,${classes;ANNOTATION;javax.ws.rs.Path}</Jaxrs-Resources>
+ </instructions>
+ <manifestLocation>${project.basedir}/src/main/resources/META-INF</manifestLocation>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>containermanager</artifactId>
+ <version>0.5.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>commons.northbound</artifactId>
+ <version>0.4.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal</artifactId>
+ <version>0.5.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller.tests</groupId>
+ <artifactId>sal-remoterpc-connector-test-provider</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller.tests</groupId>
+ <artifactId>sal-remoterpc-connector-test-consumer</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-remoterpc-connector</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.core</artifactId>
+ <version>5.0.0</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>zeromq-routingtable.implementation</artifactId>
+ <version>0.4.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ </dependencies>
+
+ </project>
--- /dev/null
+package org.opendaylight.controller.tests.zmqrouter.rest;
+
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
+import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.CompositeNodeImpl;
+import org.opendaylight.controller.sal.connector.remoterpc.impl.RoutingTableImpl;
+import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
+import org.opendaylight.controller.sample.zeromq.consumer.ExampleConsumer;
+import org.opendaylight.controller.sample.zeromq.provider.ExampleProvider;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.osgi.framework.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import java.io.Serializable;
+import java.net.URI;
+import java.util.Set;
+
+@Path("router")
+public class Router {
+ private Logger _logger = LoggerFactory.getLogger(Router.class);
+ private final URI namespace = URI.create("http://cisco.com/example");
+ private final QName QNAME = new QName(namespace, "heartbeat");
+
+
+ @GET
+ @Path("/hello")
+ @Produces(MediaType.TEXT_PLAIN)
+ public String hello() {
+ return "Hello";
+ }
+
+ @GET
+ @Path("/announce")
+ @Produces(MediaType.TEXT_PLAIN)
+ public String announce() {
+ _logger.info("Announce request received");
+
+ BundleContext ctx = getBundleContext();
+ ServiceReference providerRef = ctx.getServiceReference(ExampleProvider.class);
+ if (providerRef == null) {
+ _logger.debug("Could not get provider reference");
+ return "Could not get provider reference";
+ }
+
+ ExampleProvider provider = (ExampleProvider) ctx.getService(providerRef);
+ if (provider == null) {
+ _logger.info("Could not get provider service");
+ return "Could not get provider service";
+ }
+
+ provider.announce(QNAME);
+ return "Announcement sent ";
+
+ }
+
+ @GET
+ @Path("/rpc")
+ @Produces(MediaType.TEXT_PLAIN)
+ public String invokeRpc() throws Exception {
+ _logger.info("Invoking RPC");
+
+ ExampleConsumer consumer = getConsumer();
+ RpcResult<CompositeNode> result = consumer.invokeRpc(QNAME, new CompositeNodeImpl());
+ _logger.info("Result [{}]", result.isSuccessful());
+
+ return stringify(result);
+ }
+
+ @GET
+ @Path("/rpc-success")
+ @Produces(MediaType.TEXT_PLAIN)
+ public String invokeRpcSuccess() throws Exception {
+ ExampleConsumer consumer = getConsumer();
+ RpcResult<CompositeNode> result = consumer.invokeRpc(QNAME, consumer.getValidCompositeNodeWithFourSimpleChildren()); //TODO: Change this
+ _logger.info("Result [{}]", result.isSuccessful());
+
+ return stringify(result);
+ }
+
+ @GET
+ @Path("/rpc-failure")
+ @Produces(MediaType.TEXT_PLAIN)
+ public String invokeRpcFailure() throws Exception {
+ ExampleConsumer consumer = getConsumer();
+ //RpcResult<CompositeNode> result = consumer.invokeRpc(QNAME, consumer.getInvalidCompositeNodeCompositeChild()); //TODO: Change this
+ RpcResult<CompositeNode> result = consumer.invokeRpc(QNAME, null); //TODO: Change this
+ _logger.info("Result [{}]", result.isSuccessful());
+
+ return stringify(result);
+ }
+
+ @GET
+ @Path("/routingtable")
+ @Produces(MediaType.TEXT_PLAIN)
+ public String invokeRoutingTable() {
+ _logger.info("Invoking adding an entry in routing table");
+
+ BundleContext ctx = getBundleContext();
+ ServiceReference routingTableServiceReference = ctx.getServiceReference(RoutingTable.class);
+ if (routingTableServiceReference == null) {
+ _logger.debug("Could not get routing table impl reference");
+ return "Could not get routingtable referen ";
+ }
+ RoutingTable routingTable = (RoutingTableImpl) ctx.getService(routingTableServiceReference);
+ if (routingTable == null) {
+ _logger.info("Could not get routing table service");
+ return "Could not get routing table service";
+ }
+
+
+ RoutingIdentifierImpl rii = new RoutingIdentifierImpl();
+ try {
+ routingTable.addGlobalRoute(rii.toString(), "172.27.12.1:5000");
+ } catch (RoutingTableException e) {
+ _logger.error("error in adding routing identifier" + e.getMessage());
+
+ } catch (SystemException e) {
+ _logger.error("error in adding routing identifier" + e.getMessage());
+ }
+
+ Set<String> routes = routingTable.getRoutes(rii.toString());
+
+ StringBuilder stringBuilder = new StringBuilder();
+ for (String route : routes) {
+ stringBuilder.append(route);
+ }
+
+ _logger.info("Result [{}] routes added for route" + rii + stringBuilder.toString());
+
+ return stringBuilder.toString();
+ }
+
+ @GET
+ @Path("/routingtabledelete")
+ @Produces(MediaType.TEXT_PLAIN)
+ public String invokeDeleteRoutingTable() {
+ _logger.info("Invoking adding an entry in routing table");
+
+ BundleContext ctx = getBundleContext();
+ ServiceReference routingTableServiceReference = ctx.getServiceReference(RoutingTable.class);
+ if (routingTableServiceReference == null) {
+ _logger.debug("Could not get routing table impl reference");
+ return "Could not get routingtable referen ";
+ }
+ RoutingTable routingTable = (RoutingTableImpl) ctx.getService(routingTableServiceReference);
+ if (routingTable == null) {
+ _logger.info("Could not get routing table service");
+ return "Could not get routing table service";
+ }
+
+
+ RoutingIdentifierImpl rii = new RoutingIdentifierImpl();
+ try {
+ routingTable.removeGlobalRoute(rii.toString());
+ } catch (RoutingTableException e) {
+ _logger.error("error in adding routing identifier" + e.getMessage());
+
+ } catch (SystemException e) {
+ _logger.error("error in adding routing identifier" + e.getMessage());
+ }
+
+ Set<String> routes = routingTable.getRoutes(rii.toString());
+
+ StringBuilder stringBuilder = new StringBuilder();
+ if (routes != null) {
+ for (String route : routes) {
+ stringBuilder.append(route);
+ }
+ } else {
+ stringBuilder.append(" successfully");
+ }
+
+ _logger.info("Result [{}] routes removed for route" + rii + stringBuilder.toString());
+
+ return stringBuilder.toString();
+ }
+
+ private String stringify(RpcResult<CompositeNode> result) {
+ CompositeNode node = result.getResult();
+ StringBuilder builder = new StringBuilder("result:").append(XmlUtils.compositeNodeToXml(node)).append("\n")
+ .append("error:").append(result.getErrors()).append("\n");
+
+ return builder.toString();
+ }
+
+ private BundleContext getBundleContext() {
+ ClassLoader tlcl = Thread.currentThread().getContextClassLoader();
+ Bundle bundle = null;
+
+ if (tlcl instanceof BundleReference) {
+ bundle = ((BundleReference) tlcl).getBundle();
+ } else {
+ _logger.info("Unable to determine the bundle context based on " +
+ "thread context classloader.");
+ bundle = FrameworkUtil.getBundle(this.getClass());
+ }
+ return (bundle == null ? null : bundle.getBundleContext());
+ }
+
+ private ExampleConsumer getConsumer() {
+ BundleContext ctx = getBundleContext();
+ ServiceReference consumerRef = ctx.getServiceReference(ExampleConsumer.class);
+ if (consumerRef == null) {
+ _logger.debug("Could not get consumer reference");
+ throw new NullPointerException("Could not get consumer reference");
+ }
+ ExampleConsumer consumer = (ExampleConsumer) ctx.getService(consumerRef);
+ if (consumer == null) {
+ _logger.info("Could not get consumer service");
+ throw new NullPointerException("Could not get consumer service");
+ }
+ return consumer;
+ }
+
+ class RoutingIdentifierImpl implements RpcRouter.RouteIdentifier, Serializable {
+
+ private final URI namespace = URI.create("http://cisco.com/example");
+ private final QName QNAME = new QName(namespace, "global");
+ private final QName instance = new QName(URI.create("127.0.0.1"), "local");
+
+ @Override
+ public QName getContext() {
+ return QNAME;
+ }
+
+ @Override
+ public QName getType() {
+ return QNAME;
+ }
+
+ @Override
+ public org.opendaylight.yangtools.yang.data.api.InstanceIdentifier getRoute() {
+ return InstanceIdentifier.of(instance);
+ }
+ }
+}
--- /dev/null
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<web-app xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
+ version="3.0">
+ <servlet>
+ <servlet-name>JAXRSZmq</servlet-name>
+ <servlet-class>com.sun.jersey.spi.container.servlet.ServletContainer</servlet-class>
+ <init-param>
+ <param-name>javax.ws.rs.Application</param-name>
+ <param-value>org.opendaylight.controller.northbound.commons.NorthboundApplication</param-value>
+ </init-param>
+ <load-on-startup>1</load-on-startup>
+ </servlet>
+
+ <servlet-mapping>
+ <servlet-name>JAXRSZmq</servlet-name>
+ <url-pattern>/*</url-pattern>
+ </servlet-mapping>
+
+
+
+ <security-constraint>
+ <web-resource-collection>
+ <web-resource-name>NB api</web-resource-name>
+ <url-pattern>/*</url-pattern>
+ <http-method>POST</http-method>
+ <http-method>GET</http-method>
+ <http-method>PUT</http-method>
+ <http-method>PATCH</http-method>
+ <http-method>DELETE</http-method>
+ <http-method>HEAD</http-method>
+ </web-resource-collection>
+ <auth-constraint>
+ <role-name>System-Admin</role-name>
+ <role-name>Network-Admin</role-name>
+ <role-name>Network-Operator</role-name>
+ <role-name>Container-User</role-name>
+ </auth-constraint>
+ </security-constraint>
+
+ <security-role>
+ <role-name>System-Admin</role-name>
+ </security-role>
+ <security-role>
+ <role-name>Network-Admin</role-name>
+ </security-role>
+ <security-role>
+ <role-name>Network-Operator</role-name>
+ </security-role>
+ <security-role>
+ <role-name>Container-User</role-name>
+ </security-role>
+
+ <login-config>
+ <auth-method>BASIC</auth-method>
+ <realm-name>opendaylight</realm-name>
+ </login-config>
+</web-app>
+++ /dev/null
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>sal-parent</artifactId>
- <version>1.0-SNAPSHOT</version>
- </parent>
-
- <artifactId>sal-zeromq-connector</artifactId>
- <packaging>bundle</packaging>
-
- <properties>
- <scala.version>2.10.3</scala.version>
- </properties>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.felix</groupId>
- <artifactId>maven-bundle-plugin</artifactId>
- <extensions>true</extensions>
- <configuration>
- <instructions>
- <Import-Package>
- org.opendaylight.controller.sal.connector.api,
- org.opendaylight.controller.sal.core.api,
- org.opendaylight.yangtools.concepts;version="[0.1,1)",
- org.opendaylight.yangtools.yang.common;version="[0.5,1)",
- org.opendaylight.yangtools.yang.data.api;version="[0.5,1)",
- org.zeromq;version="[0.3,1)"
- </Import-Package>
- <Bundle-Activator>org.opendaylight.controller.sal.connector.remoterpc.router.zeromq.Activator</Bundle-Activator>
- </instructions>
- </configuration>
- </plugin>
-
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <version>3.1.6</version>
- <configuration>
- <recompileMode>incremental</recompileMode>
- <args>
- <arg>-target:jvm-1.7</arg>
- </args>
- <javacArgs>
- <javacArg>-source</javacArg><javacArg>1.7</javacArg>
- <javacArg>-target</javacArg><javacArg>1.7</javacArg>
- </javacArgs>
- </configuration>
- <executions>
- <execution>
- <id>scala-compile</id>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- <execution>
- <id>scala-test-compile</id>
- <goals>
- <goal>testCompile</goal>
- </goals>
- </execution>
- </executions>
-
- </plugin>
- <plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- <executions>
- <execution>
- <id>default-compile</id>
- <phase>none</phase>
- </execution>
- <execution>
- <id>default-testCompile</id>
- <phase>none</phase>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
- <dependencies>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>${scala.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>containermanager</artifactId>
- <version>0.5.1-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>commons.northbound</artifactId>
- <version>0.4.1-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>sal</artifactId>
- <version>0.5.1-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.opendaylight.yangtools</groupId>
- <artifactId>yang-binding</artifactId>
- </dependency>
- <dependency>
- <groupId>org.opendaylight.yangtools</groupId>
- <artifactId>yang-common</artifactId>
- </dependency>
- <dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>sal-connector-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>sal-common-util</artifactId>
- <version>1.0-SNAPSHOT</version>
- </dependency>
-
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- </dependency>
- <dependency>
- <groupId>org.jeromq</groupId>
- <artifactId>jeromq</artifactId>
- <version>0.3.0-SNAPSHOT</version>
- </dependency>
-
- </dependencies>
- <repositories>
- <repository>
- <id>sonatype-nexus-snapshots</id>
- <url>https://oss.sonatype.org/content/repositories/snapshots</url>
- <releases>
- <enabled>false</enabled>
- </releases>
- <snapshots>
- <enabled>true</enabled>
- </snapshots>
- </repository>
- </repositories>
-
-</project>
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.connector.remoterpc.api;
-
-import java.util.Map;
-import java.util.Set;
-
-public interface RouteChange<I, R> {
-
- Map<I, Set<R>> getRemovals();
- Map<I, Set<R>> getAnnouncements();
-}
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq;
-
-import org.opendaylight.controller.sal.core.api.AbstractProvider;
-import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
-import org.osgi.framework.BundleContext;
-
-public class Activator extends AbstractProvider {
-
- ZeroMqRpcRouter router;
-
- @Override
- public void onSessionInitiated(ProviderSession session) {
- router = ZeroMqRpcRouter.getInstance();
- router.setBrokerSession(session);
- router.start();
- }
-
- @Override
- protected void stopImpl(BundleContext context) {
- router.stop();
- }
-
-}
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq;
-
-import java.io.IOException;
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.opendaylight.controller.sal.connector.remoterpc.router.zeromq.Message.MessageType;
-import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
-import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
-import org.opendaylight.controller.sal.core.api.RpcImplementation;
-import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.zeromq.ZMQ;
-
-/**
- * ZeroMq based implementation of RpcRouter
- * TODO:
- * 1. Make it multi VM aware
- * 2. Make rpc request handling async and non-blocking. Note zmq socket is not thread safe
- * 3. sendRpc() should use connection pooling
- * 4. Read properties from config file using existing(?) ODL properties framework
- */
-public class ZeroMqRpcRouter implements RpcRouter<QName, QName, InstanceIdentifier, Object> {
-
- private ExecutorService serverPool;
- private static ExecutorService handlersPool;
-
- private Map<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String> routingTable;
-
- private ProviderSession brokerSession;
-
- private ZMQ.Context context;
- private ZMQ.Socket publisher;
- private ZMQ.Socket subscriber;
- private ZMQ.Socket replySocket;
-
- private static ZeroMqRpcRouter _instance = new ZeroMqRpcRouter();
-
- private final RpcFacade facade = new RpcFacade();
- private final RpcListener listener = new RpcListener();
-
- private final String localIp = getLocalIpAddress();
-
- private String pubPort = System.getProperty("pub.port");// port on which announcements are sent
- private String subPort = System.getProperty("sub.port");// other controller's pub port
- private String pubIp = System.getProperty("pub.ip"); // other controller's ip
- private String rpcPort = System.getProperty("rpc.port");// port on which RPC messages are received
-
- private Logger _logger = LoggerFactory.getLogger(ZeroMqRpcRouter.class);
-
- //Prevent instantiation
- private ZeroMqRpcRouter() {
- }
-
- public static ZeroMqRpcRouter getInstance() {
- return _instance;
- }
-
- public void start() {
- context = ZMQ.context(2);
- publisher = context.socket(ZMQ.PUB);
- int ret = publisher.bind("tcp://*:" + pubPort);
- // serverPool = Executors.newSingleThreadExecutor();
- serverPool = Executors.newCachedThreadPool();
- handlersPool = Executors.newCachedThreadPool();
- routingTable = new ConcurrentHashMap<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String>();
-
- // Start listening for announce and rpc messages
- serverPool.execute(receive());
-
- brokerSession.addRpcRegistrationListener(listener);
-
- Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
- for (QName rpc : currentlySupported) {
- listener.onRpcImplementationAdded(rpc);
- }
-
- }
-
- public void stop() {
- if (handlersPool != null)
- handlersPool.shutdown();
- if (serverPool != null)
- serverPool.shutdown();
- if (publisher != null) {
- publisher.setLinger(0);
- publisher.close();
- }
- if (replySocket != null) {
- replySocket.setLinger(0);
- replySocket.close();
- }
- if (subscriber != null) {
- subscriber.setLinger(0);
- subscriber.close();
- }
- if (context != null)
- context.term();
-
- }
-
- private Runnable receive() {
- return new Runnable() {
- public void run() {
- try {
- // Bind to RPC reply socket
- replySocket = context.socket(ZMQ.REP);
- replySocket.bind("tcp://*:" + rpcPort);
-
- // Bind to publishing controller
- subscriber = context.socket(ZMQ.SUB);
- String pubAddress = "tcp://" + pubIp + ":" + subPort;
- subscriber.connect(pubAddress);
- _logger.debug("{} Subscribing at[{}]", Thread.currentThread().getName(), pubAddress);
-
- //subscribe for announcements
- //TODO: Message type would be changed. Update this
- subscriber.subscribe(Message.serialize(Message.MessageType.ANNOUNCE));
-
- // Poller enables listening on multiple sockets using a single thread
- ZMQ.Poller poller = new ZMQ.Poller(2);
- poller.register(replySocket, ZMQ.Poller.POLLIN);
- poller.register(subscriber, ZMQ.Poller.POLLIN);
-
- //TODO: Add code to restart the thread after exception
- while (!Thread.currentThread().isInterrupted()) {
-
- poller.poll();
-
- if (poller.pollin(0)) {
- handleRpcCall();
- }
- if (poller.pollin(1)) {
- handleAnnouncement();
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- replySocket.setLinger(0);
- replySocket.close();
- subscriber.setLinger(0);
- subscriber.close();
- }
- };
- }
-
- /**
- * @throws IOException
- * @throws ClassNotFoundException
- */
- private void handleAnnouncement() throws IOException, ClassNotFoundException {
-
- _logger.info("Announcement received");
- Message.MessageType topic = (MessageType) Message.deserialize(subscriber.recv());
-
- if (subscriber.hasReceiveMore()) {
- try {
- Message m = (Message) Message.deserialize(subscriber.recv());
- _logger.debug("Announcement message [{}]", m);
-
- // TODO: check on msg type or topic. Both
- // should be same. Need to normalize.
- if (Message.MessageType.ANNOUNCE == m.getType())
- updateRoutingTable(m);
- } catch (IOException | ClassNotFoundException e) {
- e.printStackTrace();
- }
- }
-
- }
-
- /**
- * @throws InterruptedException
- * @throws ExecutionException
- */
- private void handleRpcCall() throws InterruptedException, ExecutionException {
- try {
- Message request = parseMessage(replySocket);
-
- _logger.debug("Received rpc request [{}]", request);
-
- // Call broker to process the message then reply
- Future<RpcResult<CompositeNode>> rpc = brokerSession.rpc(
- (QName) request.getRoute().getType(), (CompositeNode) request.getPayload());
-
- RpcResult<CompositeNode> result = rpc.get();
-
- Message response = new Message.MessageBuilder()
- .type(MessageType.RESPONSE)
- .sender(localIp + ":" + rpcPort)
- .route(request.getRoute())
- //.payload(result) TODO: enable and test
- .build();
-
- replySocket.send(Message.serialize(response));
-
- _logger.debug("Sent rpc response [{}]", response);
-
- } catch (IOException ex) {
- //TODO: handle exception and send error codes to caller
- ex.printStackTrace();
- }
- }
-
-
- @Override
- public Future<RpcReply<Object>> sendRpc(
- final RpcRequest<QName, QName, InstanceIdentifier, Object> input) {
-
- return handlersPool.submit(new Callable<RpcReply<Object>>() {
-
- @Override
- public RpcReply<Object> call() {
- ZMQ.Socket requestSocket = context.socket(ZMQ.REQ);
-
- // TODO pick the ip and port from routing table based on routing identifier
- requestSocket.connect("tcp://" + pubIp + ":5554");
-
- Message requestMessage = new Message.MessageBuilder()
- .type(MessageType.REQUEST)
- .sender(localIp + ":" + rpcPort)
- .route(input.getRoutingInformation())
- .payload(input.getPayload())
- .build();
-
- _logger.debug("Sending rpc request [{}]", requestMessage);
-
- RpcReply<Object> reply = null;
-
- try {
-
- requestSocket.send(Message.serialize(requestMessage));
- final Message response = parseMessage(requestSocket);
-
- _logger.debug("Received response [{}]", response);
-
- reply = new RpcReply<Object>() {
-
- @Override
- public Object getPayload() {
- return response.getPayload();
- }
- };
- } catch (IOException ex) {
- // TODO: Pass exception back to the caller
- ex.printStackTrace();
- }
-
- return reply;
- }
- });
- }
-
- /**
- * TODO: Remove this implementation and use RoutingTable implementation to send announcements
- * Publishes a notice to other controllers in the cluster
- *
- * @param notice
- */
- public void publish(final Message notice) {
- Runnable task = new Runnable() {
- public void run() {
-
- try {
-
- publisher.sendMore(Message.serialize(Message.MessageType.ANNOUNCE));
- publisher.send(Message.serialize(notice));
- _logger.debug("Announcement sent [{}]", notice);
- } catch (IOException ex) {
- _logger.error("Error in sending announcement [{}]", notice);
- ex.printStackTrace();
- }
- }
- };
- handlersPool.execute(task);
- }
-
- /**
- * Finds IPv4 address of the local VM
- * TODO: This method is non-deterministic. There may be more than one IPv4 address. Cant say which
- * address will be returned. Read IP from a property file or enhance the code to make it deterministic.
- * Should we use IP or hostname?
- *
- * @return
- */
- private String getLocalIpAddress() {
- String hostAddress = null;
- Enumeration e = null;
- try {
- e = NetworkInterface.getNetworkInterfaces();
- } catch (SocketException e1) {
- e1.printStackTrace();
- }
- while (e.hasMoreElements()) {
-
- NetworkInterface n = (NetworkInterface) e.nextElement();
-
- Enumeration ee = n.getInetAddresses();
- while (ee.hasMoreElements()) {
- InetAddress i = (InetAddress) ee.nextElement();
- if ((i instanceof Inet4Address) && (i.isSiteLocalAddress()))
- hostAddress = i.getHostAddress();
- }
- }
- return hostAddress;
-
- }
-
- /**
- * TODO: Change to use external routing table implementation
- *
- * @param msg
- */
- private void updateRoutingTable(Message msg) {
- routingTable.put(msg.getRoute(), msg.getSender());
- RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> route = msg.getRoute();
-
- // Currently only registers rpc implementation.
- // TODO: do registration for instance based routing
- QName rpcType = route.getType();
- RpcRegistration registration = brokerSession.addRpcImplementation(rpcType, facade);
- _logger.debug("Routing table updated");
- }
-
- /**
- * @param socket
- * @return
- */
- private Message parseMessage(ZMQ.Socket socket) {
-
- Message msg = null;
- try {
- byte[] bytes = socket.recv();
- _logger.debug("Received bytes:[{}]", bytes.length);
- msg = (Message) Message.deserialize(bytes);
- } catch (Throwable t) {
- t.printStackTrace();
- }
- return msg;
- }
-
- private class RpcFacade implements RpcImplementation {
-
- @Override
- public Set<QName> getSupportedRpcs() {
- return Collections.emptySet();
- }
-
- @Override
- public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
-
- RouteIdentifierImpl routeId = new RouteIdentifierImpl();
- routeId.setType(rpc);
-
- RpcRequestImpl request = new RpcRequestImpl();
- request.setRouteIdentifier(routeId);
- request.setPayload(input);
-
- final Future<RpcReply<Object>> ret = sendRpc(request);
-
- //TODO: Review result handling
- RpcResult<CompositeNode> result = new RpcResult<CompositeNode>() {
- @Override
- public boolean isSuccessful() {
- try {
- ret.get();
- } catch (InterruptedException | ExecutionException e) {
- e.printStackTrace();
- return false;
- }
- return true;
- }
-
- @Override
- public CompositeNode getResult() {
- return null;
- }
-
- @Override
- public Collection<RpcError> getErrors() {
- return Collections.EMPTY_LIST;
- }
- };
- return result;
- }
- }
-
- /**
- * Listener for rpc registrations
- */
- private class RpcListener implements RpcRegistrationListener {
-
- @Override
- public void onRpcImplementationAdded(QName name) {
-
- _logger.debug("Announcing registration for [{}]", name);
- RouteIdentifierImpl routeId = new RouteIdentifierImpl();
- routeId.setType(name);
-
- //TODO: Make notice immutable and change message type
- Message notice = new Message.MessageBuilder()
- .type(MessageType.ANNOUNCE)
- .sender("tcp://" + localIp + ":" + rpcPort)
- .route(routeId)
- .build();
-
- publish(notice);
- }
-
- @Override
- public void onRpcImplementationRemoved(QName name) {
- // TODO: send a rpc-deregistrtation notice
-
- }
- }
-
- public void setBrokerSession(ProviderSession session) {
- this.brokerSession = session;
-
- }
-
-}
+++ /dev/null
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <artifactId>sal-parent</artifactId>
- <version>1.0-SNAPSHOT</version>
- <groupId>org.opendaylight.controller</groupId>
- </parent>
- <packaging>pom</packaging>
- <groupId>org.opendaylight.controller.tests</groupId>
- <artifactId>sal-test-parent</artifactId>
- <scm>
- <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
- <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
- <url>https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL</url>
- </scm>
-
- <modules>
- <module>zeromq-test-consumer</module>
- <module>zeromq-test-it</module>
- <module>zeromq-test-provider</module>
- </modules>
-
-</project>
+++ /dev/null
-package org.opendaylight.controller.sample.zeromq.consumer;
-
-import java.net.URI;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.opendaylight.controller.sal.core.api.AbstractConsumer;
-import org.opendaylight.controller.sal.core.api.Broker.ConsumerSession;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.osgi.framework.BundleContext;
-
-public class ExampleConsumer extends AbstractConsumer {
-
- private final URI namespace = URI.create("http://cisco.com/example");
- private final QName QNAME = new QName(namespace,"heartbeat");
-
- ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
- private ConsumerSession session;
-
-
- @Override
- public void onSessionInitiated(ConsumerSession session) {
- this.session = session;
- executor.scheduleAtFixedRate(new Runnable() {
-
- @Override
- public void run() {
- int count = 0;
- try {
- Future<RpcResult<CompositeNode>> future = ExampleConsumer.this.session.rpc(QNAME, null);
- RpcResult<CompositeNode> result = future.get();
- System.out.println("Result received. Status is :" + result.isSuccessful());
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- }
- }, 0, 10, TimeUnit.SECONDS);
- }
-
- @Override
- protected void stopImpl(BundleContext context) {
- // TODO Auto-generated method stub
- super.stopImpl(context);
- executor.shutdown();
- }
-}
+++ /dev/null
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <artifactId>sal-test-parent</artifactId>
- <groupId>org.opendaylight.controller.tests</groupId>
- <version>1.0-SNAPSHOT</version>
- </parent>
- <artifactId>zeromq-test-it</artifactId>
- <scm>
- <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
- <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
- <url>https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL</url>
- </scm>
-
- <properties>
- <exam.version>3.0.0</exam.version>
- <url.version>1.5.0</url.version>
- </properties>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.ops4j.pax.exam</groupId>
- <artifactId>maven-paxexam-plugin</artifactId>
- <version>1.2.4</version>
- <executions>
- <execution>
- <id>generate-config</id>
- <goals>
- <goal>generate-depends-file</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- <pluginManagement>
- <plugins>
- <!--This plugin's configuration is used to store Eclipse m2e settings
- only. It has no influence on the Maven build itself. -->
- <plugin>
- <groupId>org.eclipse.m2e</groupId>
- <artifactId>lifecycle-mapping</artifactId>
- <version>1.0.0</version>
- <configuration>
- <lifecycleMappingMetadata>
- <pluginExecutions>
- <pluginExecution>
- <pluginExecutionFilter>
- <groupId>
- org.ops4j.pax.exam
- </groupId>
- <artifactId>
- maven-paxexam-plugin
- </artifactId>
- <versionRange>
- [1.2.4,)
- </versionRange>
- <goals>
- <goal>
- generate-depends-file
- </goal>
- </goals>
- </pluginExecutionFilter>
- <action>
- <ignore></ignore>
- </action>
- </pluginExecution>
- </pluginExecutions>
- </lifecycleMappingMetadata>
- </configuration>
- </plugin>
- </plugins>
- </pluginManagement>
- </build>
-
- <dependencies>
- <dependency>
- <groupId>org.opendaylight.yangtools.thirdparty</groupId>
- <artifactId>xtend-lib-osgi</artifactId>
- <version>2.4.3</version>
- </dependency>
- <dependency>
- <groupId>org.opendaylight.controller.tests</groupId>
- <artifactId>zeromq-test-provider</artifactId>
- <version>1.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.opendaylight.controller.tests</groupId>
- <artifactId>zeromq-test-consumer</artifactId>
- <version>1.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>sal-broker-impl</artifactId>
- <version>1.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.ops4j.pax.exam</groupId>
- <artifactId>pax-exam-container-native</artifactId>
- <version>${exam.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.ops4j.pax.exam</groupId>
- <artifactId>pax-exam-junit4</artifactId>
- <version>${exam.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.ops4j.pax.exam</groupId>
- <artifactId>pax-exam-link-mvn</artifactId>
- <version>${exam.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>equinoxSDK381</groupId>
- <artifactId>org.eclipse.osgi</artifactId>
- <version>3.8.1.v20120830-144521</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>log4j-over-slf4j</artifactId>
- <version>1.7.2</version>
- </dependency>
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-core</artifactId>
- <version>1.0.9</version>
- </dependency>
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- <version>1.0.9</version>
- </dependency>
- <dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>sal-binding-api</artifactId>
- <version>1.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>sal-common-util</artifactId>
- <version>1.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>sal-core-api</artifactId>
- <version>1.0-SNAPSHOT</version>
- </dependency>
-
-
- <dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>containermanager</artifactId>
- <version>0.5.1-SNAPSHOT</version>
- </dependency>
-
- <dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>sal</artifactId>
- <version>0.5.1-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.opendaylight.yangtools</groupId>
- <artifactId>yang-binding</artifactId>
- </dependency>
- <dependency>
- <groupId>org.opendaylight.yangtools</groupId>
- <artifactId>yang-common</artifactId>
- </dependency>
- <dependency>
- <groupId>org.opendaylight.yangtools</groupId>
- <artifactId>yang-data-api</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>sal-common-util</artifactId>
- <version>1.0-SNAPSHOT</version>
- </dependency>
- </dependencies>
-</project>
+++ /dev/null
-package org.opendaylight.controller.sample.zeromq.test.it;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.ops4j.pax.exam.Configuration;
-import org.ops4j.pax.exam.Option;
-import org.ops4j.pax.exam.junit.PaxExam;
-import org.osgi.framework.BundleContext;
-
-import javax.inject.Inject;
-
-import static org.junit.Assert.assertTrue;
-import static org.ops4j.pax.exam.CoreOptions.*;
-
-@RunWith(PaxExam.class)
-public class ServiceConsumerController {
-
- public static final String ODL = "org.opendaylight.controller";
- public static final String YANG = "org.opendaylight.yangtools";
- public static final String SAMPLE = "org.opendaylight.controller.samples";
-
- @Test
- public void properInitialized() throws Exception {
-
- Thread.sleep(30000); // Waiting for services to get wired.
- assertTrue(true);
- //assertTrue(consumer.createToast(WhiteBread.class, 5));
-
- }
-
-// @Inject
-// BindingAwareBroker broker;
-
-// @Inject
-// ToastConsumer consumer;
-
- @Inject
- BundleContext ctx;
-
- @Configuration
- public Option[] config() {
- return options(systemProperty("osgi.console").value("2401"),
- systemProperty("pub.port").value("5557"),
- systemProperty("sub.port").value("5556"),
- systemProperty("rpc.port").value("5555"),
- systemProperty("pub.ip").value("localhost"),
- mavenBundle("org.slf4j", "slf4j-api").versionAsInProject(), //
- mavenBundle("org.slf4j", "log4j-over-slf4j").versionAsInProject(), //
- mavenBundle("ch.qos.logback", "logback-core").versionAsInProject(), //
- mavenBundle("ch.qos.logback", "logback-classic").versionAsInProject(), //
-
- //mavenBundle(ODL, "sal-binding-broker-impl").versionAsInProject().update(), //
- mavenBundle(ODL, "sal-common").versionAsInProject(), //
- mavenBundle(ODL, "sal-common-api").versionAsInProject(),//
- mavenBundle(ODL, "sal-common-impl").versionAsInProject(), //
- mavenBundle(ODL, "sal-common-util").versionAsInProject(), //
- mavenBundle(ODL, "sal-core-api").versionAsInProject().update(), //
- mavenBundle(ODL, "sal-broker-impl").versionAsInProject(), //
- mavenBundle(ODL, "sal-core-spi").versionAsInProject().update(), //
- mavenBundle(ODL, "sal-connector-api").versionAsInProject(), //
- mavenBundle(SAMPLE, "zeromq-test-consumer").versionAsInProject(), //
- mavenBundle(ODL, "sal-zeromq-connector").versionAsInProject(), //
- mavenBundle(YANG, "concepts").versionAsInProject(),
- mavenBundle(YANG, "yang-binding").versionAsInProject(), //
- mavenBundle(YANG, "yang-common").versionAsInProject(), //
- mavenBundle(YANG, "yang-data-api").versionAsInProject(), //
- mavenBundle(YANG, "yang-model-api").versionAsInProject(), //
- mavenBundle(YANG+".thirdparty", "xtend-lib-osgi").versionAsInProject(), //
- mavenBundle("com.google.guava", "guava").versionAsInProject(), //
- mavenBundle("org.jeromq", "jeromq").versionAsInProject(),
- junitBundles()
- );
- }
-
-}
+++ /dev/null
-package org.opendaylight.controller.sample.zeromq.test.it;
-
-import static org.junit.Assert.*;
-import static org.ops4j.pax.exam.CoreOptions.junitBundles;
-import static org.ops4j.pax.exam.CoreOptions.mavenBundle;
-import static org.ops4j.pax.exam.CoreOptions.options;
-import static org.ops4j.pax.exam.CoreOptions.systemPackages;
-import static org.ops4j.pax.exam.CoreOptions.systemProperty;
-import static org.ops4j.pax.exam.CoreOptions.maven;
-
-import java.util.Collection;
-
-import javax.inject.Inject;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
-import org.ops4j.pax.exam.Configuration;
-import org.ops4j.pax.exam.CoreOptions;
-import org.ops4j.pax.exam.Option;
-import org.ops4j.pax.exam.junit.PaxExam;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.InvalidSyntaxException;
-import org.osgi.framework.ServiceReference;
-
-@RunWith(PaxExam.class)
-public class ServiceProviderController {
-
- public static final String ODL = "org.opendaylight.controller";
- public static final String YANG = "org.opendaylight.yangtools";
- public static final String SAMPLE = "org.opendaylight.controller.samples";
-
- @Test
- public void properInitialized() throws Exception {
-
- Thread.sleep(30000); // Waiting for services to get wired.
- assertTrue(true);
- //assertTrue(consumer.createToast(WhiteBread.class, 5));
-
- }
-
-// @Inject
-// BindingAwareBroker broker;
-
-// @Inject
-// ToastConsumer consumer;
-
- @Inject
- BundleContext ctx;
-
- @Configuration
- public Option[] config() {
- return options(systemProperty("osgi.console").value("2401"),
- systemProperty("pub.port").value("5556"),
- systemProperty("sub.port").value("5557"),
- systemProperty("rpc.port").value("5554"),
- systemProperty("pub.ip").value("localhost"),
- mavenBundle("org.slf4j", "slf4j-api").versionAsInProject(), //
- mavenBundle("org.slf4j", "log4j-over-slf4j").versionAsInProject(), //
- mavenBundle("ch.qos.logback", "logback-core").versionAsInProject(), //
- mavenBundle("ch.qos.logback", "logback-classic").versionAsInProject(), //
-
- //mavenBundle(ODL, "sal-binding-broker-impl").versionAsInProject().update(), //
- mavenBundle(ODL, "sal-common").versionAsInProject(), //
- mavenBundle(ODL, "sal-common-api").versionAsInProject(),//
- mavenBundle(ODL, "sal-common-impl").versionAsInProject(), //
- mavenBundle(ODL, "sal-common-util").versionAsInProject(), //
- mavenBundle(ODL, "sal-core-api").versionAsInProject().update(), //
- mavenBundle(ODL, "sal-broker-impl").versionAsInProject(), //
- mavenBundle(ODL, "sal-core-spi").versionAsInProject().update(), //
- mavenBundle(ODL, "sal-connector-api").versionAsInProject(), //
- mavenBundle(SAMPLE, "zeromq-test-provider").versionAsInProject(), //
- mavenBundle(ODL, "sal-zeromq-connector").versionAsInProject(), //
- mavenBundle(YANG, "concepts").versionAsInProject(),
- mavenBundle(YANG, "yang-binding").versionAsInProject(), //
- mavenBundle(YANG, "yang-common").versionAsInProject(), //
- mavenBundle(YANG, "yang-data-api").versionAsInProject(), //
- mavenBundle(YANG, "yang-model-api").versionAsInProject(), //
- mavenBundle(YANG+".thirdparty", "xtend-lib-osgi").versionAsInProject(), //
- mavenBundle("com.google.guava", "guava").versionAsInProject(), //
- mavenBundle("org.jeromq", "jeromq").versionAsInProject(),
- junitBundles()
- );
- }
-
-}
+++ /dev/null
-package org.opendaylight.controller.sample.zeromq.provider;
-
-import java.net.URI;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Set;
-
-import org.opendaylight.controller.sal.common.util.Rpcs;
-import org.opendaylight.controller.sal.core.api.AbstractProvider;
-import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
-import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
-import org.opendaylight.controller.sal.core.api.RpcImplementation;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.osgi.framework.BundleContext;
-
-public class ExampleProvider extends AbstractProvider implements RpcImplementation {
-
- private final URI namespace = URI.create("http://cisco.com/example");
- private final QName QNAME = new QName(namespace,"heartbeat");
- private RpcRegistration reg;
-
-
- @Override
- public void onSessionInitiated(ProviderSession session) {
- //Adding heartbeat 10 times just to make sure subscriber get it
- for (int i=0;i<10;i++){
- System.out.println("ExampleProvider: Adding " + QNAME + " " + i);
- reg = session.addRpcImplementation(QNAME, this);
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
-
- @Override
- public Set<QName> getSupportedRpcs() {
- return Collections.singleton(QNAME);
- }
-
- @Override
- public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
- if(QNAME.equals(rpc)) {
- RpcResult<CompositeNode> output = Rpcs.getRpcResult(true, null, Collections.<RpcError>emptySet());
- return output;
- }
- RpcResult<CompositeNode> output = Rpcs.getRpcResult(false, null, Collections.<RpcError>emptySet());
- return output;
- }
-
- @Override
- protected void stopImpl(BundleContext context) {
- if(reg != null) {
- try {
- reg.close();
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }
-
-}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.opendaylight.controller</groupId>
<extensions>true</extensions>
<configuration>
<instructions>
-
+ <Export-Package>
+ org.opendaylight.controller.sal.connector.remoterpc.api,
+ org.opendaylight.controller.sal.connector.remoterpc.impl
+ </Export-Package>
<Import-Package>
javax.xml.bind.annotation,
org.opendaylight.controller.sal.core,
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>sal</artifactId>
- <version>0.5.1-SNAPSHOT</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.compendium</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>