Updated ZeroMQ implementation 27/2227/5
authorTony Tkacik <ttkacik@cisco.com>
Mon, 28 Oct 2013 18:30:40 +0000 (19:30 +0100)
committerEd Warnicke <eaw@cisco.com>
Wed, 6 Nov 2013 20:12:14 +0000 (14:12 -0600)
ZeroMQ implementation listens for rpc service implementations
on one controller node and when new implementation is registered
on controller it announces it via message bus.

The receivers of announcement registers itself as the implementation
of the same service in that node, and delegates actual processing
via message bus to originating controller instance.

This allows for deployment where one controller has a bundle
which provides service A, and other which has bundle which
only consumes service A to cooperate.

Change-Id: If5a2220e05858693db6297a7592893a09021e97d
Signed-off-by: Tony Tkacik <ttkacik@cisco.com>
18 files changed:
opendaylight/md-sal/pom.xml
opendaylight/md-sal/sal-connector-api/src/main/java/org/opendaylight/controller/sal/connector/api/RpcRouter.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/ProviderContextImpl.xtend
opendaylight/md-sal/sal-zeromq-connector/pom.xml [new file with mode: 0644]
opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/Activator.java [new file with mode: 0644]
opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/Message.java [new file with mode: 0644]
opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/RouteIdentifierImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/RpcReplyImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/RpcRequestImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/ZeroMqRpcRouter.java [new file with mode: 0644]
opendaylight/md-sal/test/pom.xml [new file with mode: 0644]
opendaylight/md-sal/test/zeromq-test-consumer/pom.xml [new file with mode: 0644]
opendaylight/md-sal/test/zeromq-test-consumer/src/main/java/org/opendaylight/controller/sample/zeromq/consumer/ExampleConsumer.java [new file with mode: 0644]
opendaylight/md-sal/test/zeromq-test-it/pom.xml [new file with mode: 0644]
opendaylight/md-sal/test/zeromq-test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/ServiceConsumerController.java [new file with mode: 0644]
opendaylight/md-sal/test/zeromq-test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/ServiceProviderController.java [new file with mode: 0644]
opendaylight/md-sal/test/zeromq-test-provider/pom.xml [new file with mode: 0644]
opendaylight/md-sal/test/zeromq-test-provider/src/main/java/org/opendaylight/controller/sample/zeromq/provider/ExampleProvider.java [new file with mode: 0644]

index a6740fabac158248dbd86892613a5942812d1f7a..7f5f10e21b2fe7af1a42267e75bcb632c376278e 100644 (file)
@@ -47,6 +47,8 @@
         <module>clustered-data-store/implementation</module>
         <module>clustered-data-store/integrationtest</module>
 
+        <module>sal-zeromq-connector</module>
+        <module>test</module>
     </modules>
 
     <properties>
                     <artifactId>jacoco-maven-plugin</artifactId>
                     <version>${jacoco.version}</version>
                 </plugin>
-                <!--This plugin's configuration is used to store Eclipse
-                    m2e settings only. It has no influence on the Maven build itself. -->
+                <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
                 <plugin>
                     <groupId>org.eclipse.m2e</groupId>
                     <artifactId>lifecycle-mapping</artifactId>
                                         <ignore></ignore>
                                     </action>
                                 </pluginExecution>
+                                <pluginExecution>
+                                    <pluginExecutionFilter>
+                                        <groupId>org.jacoco</groupId>
+                                        <artifactId>
+                                            jacoco-maven-plugin
+                                        </artifactId>
+                                        <versionRange>
+                                            [0.5.3.201107060350,)
+                                        </versionRange>
+                                        <goals>
+                                            <goal>prepare-agent</goal>
+                                        </goals>
+                                    </pluginExecutionFilter>
+                                    <action>
+                                        <ignore></ignore>
+                                    </action>
+                                </pluginExecution>
                             </pluginExecutions>
                         </lifecycleMappingMetadata>
                     </configuration>
index 4807c4e2007279f73bc4f21d89975d0d3d283c52..08fce5783ef691009b336dfc18fd7c6eb8402374 100644 (file)
@@ -11,11 +11,11 @@ import java.util.concurrent.Future;
  * @param <T> Rpc Type
  * @param <D> Data Type
  */
-public interface RpcRouter<C,R,T,D> {
+public interface RpcRouter<C,T,R,D> {
 
     
     
-    Future<RpcReply<D>> sendRpc(RpcRequest<C, R, T, D> input);
+    Future<RpcReply<D>> sendRpc(RpcRequest<C, T, R, D> input);
     
     
     /**
@@ -27,17 +27,17 @@ public interface RpcRouter<C,R,T,D> {
         * @param <T> Rpc Type
         * @param <D> Data Type
      */
-    public interface RpcRequest<C,R,T,D> {
+    public interface RpcRequest<C,T,R,D> {
 
-        RouteIdentifier<C,R,T> getRoutingInformation();
+        RouteIdentifier<C,T,R> getRoutingInformation();
         D getPayload();
     }
     
-    public interface RouteIdentifier<C,R,T> {
+    public interface RouteIdentifier<C,T,R> {
         
         C getContext(); // defines a routing table (e.g. NodeContext)
-        R getRoute(); // e.g. (node identity)
         T getType(); // rpc type
+        R getRoute(); // e.g. (node identity)
     }
     
     public interface RpcReply<D> {
index 3fdd7065770dac618a2fa5cc15ebeb1c37cb110a..bffc5705962b3fd7f2ad790a11214dee8166cbc7 100644 (file)
@@ -27,75 +27,74 @@ class ProviderContextImpl extends ConsumerContextImpl implements ProviderSession
     }
 
     override addRpcImplementation(QName rpcType, RpcImplementation implementation) throws IllegalArgumentException {
-        if(rpcType == null) {
+        if (rpcType == null) {
             throw new IllegalArgumentException("rpcType must not be null");
         }
-        if(implementation == null) {
+        if (implementation == null) {
             throw new IllegalArgumentException("Implementation must not be null");
         }
         broker.addRpcImplementation(rpcType, implementation);
         rpcImpls.put(rpcType, implementation);
-        
+
         return new RpcRegistrationImpl(rpcType, implementation, this);
     }
 
     def removeRpcImplementation(RpcRegistrationImpl implToRemove) throws IllegalArgumentException {
         val localImpl = rpcImpls.get(implToRemove.type);
-        if(localImpl !== implToRemove.instance) {
-            throw new IllegalStateException(
-                "Implementation was not registered in this session");
+        if (localImpl !== implToRemove.instance) {
+            throw new IllegalStateException("Implementation was not registered in this session");
         }
-        broker.removeRpcImplementation(implToRemove.type,localImpl);
+        broker.removeRpcImplementation(implToRemove.type, localImpl);
         rpcImpls.remove(implToRemove.type);
     }
-    
+
     override close() {
-               removeAllRpcImlementations
-       super.close
+        removeAllRpcImlementations
+        super.close
     }
-    
+
     private def removeAllRpcImlementations() {
-       if (!rpcImpls.empty) {
-               for (entry : rpcImpls.entrySet) {
-                       broker.removeRpcImplementation(entry.key,entry.value);
-               }
-               rpcImpls.clear
-       }
+        if (!rpcImpls.empty) {
+            for (entry : rpcImpls.entrySet) {
+                broker.removeRpcImplementation(entry.key, entry.value);
+            }
+            rpcImpls.clear
+        }
     }
-    
+
     override addMountedRpcImplementation(QName rpcType, RpcImplementation implementation) {
         throw new UnsupportedOperationException("TODO: auto-generated method stub")
     }
-    
+
     override addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
         throw new UnsupportedOperationException("TODO: auto-generated method stub")
     }
-    
+
     override getSupportedRpcs() {
         broker.getSupportedRpcs();
     }
-    
+
     override addRpcRegistrationListener(RpcRegistrationListener listener) {
         broker.addRpcRegistrationListener(listener);
     }
 }
 
 class RpcRegistrationImpl extends AbstractObjectRegistration<RpcImplementation> implements RpcRegistration {
-       
-       @Property
-       val QName type
-       
-       private var ProviderContextImpl context
-       
-       new(QName type, RpcImplementation instance, ProviderContextImpl ctx) {
-               super(instance)
-               _type = type
-               context = ctx
-       }
-       
-       override protected removeRegistration() {
-               context.removeRpcImplementation(this)
-               context = null  
-       }
+
+    @Property
+    val QName type
+
+    private var ProviderContextImpl context
+
+    new(QName type, RpcImplementation instance, ProviderContextImpl ctx) {
+        super(instance)
+        _type = type
+        context = ctx
+    }
+
+    override protected removeRegistration() {
+        context.removeRpcImplementation(this)
+        context = null
+    }
 
 }
diff --git a/opendaylight/md-sal/sal-zeromq-connector/pom.xml b/opendaylight/md-sal/sal-zeromq-connector/pom.xml
new file mode 100644 (file)
index 0000000..3db4a65
--- /dev/null
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>sal-parent</artifactId>
+        <version>1.0-SNAPSHOT</version>
+    </parent>
+
+
+    <artifactId>sal-zeromq-connector</artifactId>
+    <packaging>bundle</packaging>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <version>${bundle.plugin.version}</version>
+                <extensions>true</extensions>
+                <configuration>
+                    <instructions>
+                        <Import-Package>
+                        org.opendaylight.controller.sal.connector.api,
+                        org.opendaylight.controller.sal.core.api,
+                        org.opendaylight.yangtools.concepts;version="[0.1,1)",
+                        org.opendaylight.yangtools.yang.common;version="[0.5,1)",
+                        org.opendaylight.yangtools.yang.data.api;version="[0.5,1)",
+                        org.zeromq;version="[0.3,1)"
+                        </Import-Package>
+                        <Bundle-Activator>org.opendaylight.controller.sal.connector.zeromq.Activator</Bundle-Activator>
+                    </instructions>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+    <dependencies>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>containermanager</artifactId>
+            <version>0.5.1-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>commons.northbound</artifactId>
+            <version>0.4.1-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal</artifactId>
+            <version>0.5.1-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.yangtools</groupId>
+            <artifactId>yang-binding</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.yangtools</groupId>
+            <artifactId>yang-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-connector-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-common-util</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.jeromq</groupId>
+            <artifactId>jeromq</artifactId>
+            <version>0.3.0-SNAPSHOT</version>
+        </dependency>
+
+    </dependencies>
+    <repositories>
+        <repository>
+            <id>sonatype-nexus-snapshots</id>
+            <url>https://oss.sonatype.org/content/repositories/snapshots</url>
+            <releases>
+                <enabled>false</enabled>
+            </releases>
+            <snapshots>
+                <enabled>true</enabled>
+            </snapshots>
+        </repository>
+    </repositories>
+
+</project>
diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/Activator.java b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/Activator.java
new file mode 100644 (file)
index 0000000..b8933ec
--- /dev/null
@@ -0,0 +1,23 @@
+package org.opendaylight.controller.sal.connector.zeromq;
+
+import org.opendaylight.controller.sal.core.api.AbstractProvider;
+import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.osgi.framework.BundleContext;
+
+public class Activator extends AbstractProvider {
+    
+    ZeroMqRpcRouter router;
+    
+    @Override
+    public void onSessionInitiated(ProviderSession session) {
+        router = ZeroMqRpcRouter.getInstance();
+        router.setBrokerSession(session);
+        router.start();
+    }
+    
+    @Override
+    protected void stopImpl(BundleContext context) {
+       router.stop();
+    }
+
+}
diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/Message.java b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/Message.java
new file mode 100644 (file)
index 0000000..dd87646
--- /dev/null
@@ -0,0 +1,150 @@
+package org.opendaylight.controller.sal.connector.zeromq;
+
+
+import org.codehaus.jackson.map.ObjectMapper;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+
+import java.io.*;
+import java.util.Arrays;
+
+public class Message implements Serializable {
+
+ public enum MessageType {
+    ANNOUNCE((byte) 0),
+    HEARTBEAT((byte) 1),
+    REQUEST((byte) 2),
+    RESPONSE((byte) 3);
+
+    private final byte type;
+
+    MessageType(byte type) {
+      this.type = type;
+    }
+
+    public byte getType(){
+      return this.type;
+    }
+  }
+
+  private MessageType type;
+  private String sender;
+  private RpcRouter.RouteIdentifier route;
+  private Object payload;
+
+  public MessageType getType() {
+    return type;
+  }
+
+  public void setType(MessageType type) {
+    this.type = type;
+  }
+
+  public String getSender() {
+    return sender;
+  }
+
+  public void setSender(String sender) {
+    this.sender = sender;
+  }
+
+  public RpcRouter.RouteIdentifier getRoute() {
+    return route;
+  }
+
+  public void setRoute(RpcRouter.RouteIdentifier route) {
+    this.route = route;
+  }
+
+  public Object getPayload() {
+    return payload;
+  }
+
+  public void setPayload(Object payload) {
+    this.payload = payload;
+  }
+
+  @Override
+  public String toString() {
+    return "Message{" +
+        "type=" + type +
+        ", sender='" + sender + '\'' +
+        ", route=" + route +
+        ", payload=" + payload +
+        '}';
+  }
+
+  /**
+   * Converts any {@link Serializable} object to byte[]
+   *
+   * @param obj
+   * @return
+   * @throws IOException
+   */
+  public static byte[] serialize(Object obj) throws IOException {
+    ByteArrayOutputStream b = new ByteArrayOutputStream();
+    ObjectOutputStream o = new ObjectOutputStream(b);
+    o.writeObject(obj);
+    return b.toByteArray();
+  }
+
+  /**
+   * Converts byte[] to a java object
+   *
+   * @param bytes
+   * @return
+   * @throws IOException
+   * @throws ClassNotFoundException
+   */
+  public static Object deserialize(byte[] bytes) throws IOException, ClassNotFoundException {
+    ByteArrayInputStream b = new ByteArrayInputStream(bytes);
+    ObjectInputStream o = new ObjectInputStream(b);
+    return o.readObject();
+  }
+
+  public static byte[] toJsonBytes(Message m){
+    ObjectMapper o = new ObjectMapper();
+    try {
+      System.out.println(o.writeValueAsString(m));
+      return o.writeValueAsBytes(m);
+    } catch (IOException e) {
+      e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+    }
+    return null;
+  }
+
+  public static Message fromJsonBytes(byte [] bytes){
+
+    ObjectMapper o = new ObjectMapper();
+    Message m = null;
+    try {
+      m = o.readValue(bytes, Message.class);
+    } catch (IOException e) {
+      e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+    }
+    return m;
+  }
+
+  public static class Response extends Message implements RpcRouter.RpcReply {
+    private ResponseCode code; // response code
+
+    public static enum ResponseCode {
+      SUCCESS(200), BADREQUEST(400), TIMEOUT(408), GONE(410), SERVERERROR(500), SERVICEUNAVAILABLE(503);
+
+      private int code;
+
+      ResponseCode(int code) {
+        this.code = code;
+      }
+    }
+
+    public ResponseCode getCode() {
+      return code;
+    }
+
+    public void setCode(ResponseCode code) {
+      this.code = code;
+    }
+  }
+
+}
+
diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/RouteIdentifierImpl.java b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/RouteIdentifierImpl.java
new file mode 100644 (file)
index 0000000..8eab01b
--- /dev/null
@@ -0,0 +1,53 @@
+package org.opendaylight.controller.sal.connector.zeromq;
+
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+
+import java.io.Serializable;
+
+/**
+ * User: abhishk2
+ */
+public class RouteIdentifierImpl implements RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>,Serializable {
+
+  private QName context;
+  private QName type;
+  private InstanceIdentifier route;
+
+  @Override
+  public QName getContext() {
+    return this.context;
+  }
+
+  @Override
+  public QName getType() {
+    return this.type;
+  }
+
+  @Override
+  public InstanceIdentifier getRoute() {
+    return this.route;
+  }
+
+  public void setContext(QName context) {
+    this.context = context;
+  }
+
+  public void setType(QName type) {
+    this.type = type;
+  }
+
+  public void setRoute(InstanceIdentifier route) {
+    this.route = route;
+  }
+
+  @Override
+  public String toString() {
+    return "RouteIdentifierImpl{" +
+        "context=" + context +
+        ", type=" + type +
+        ", route=" + route +
+        '}';
+  }
+}
diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/RpcReplyImpl.java b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/RpcReplyImpl.java
new file mode 100644 (file)
index 0000000..66ff714
--- /dev/null
@@ -0,0 +1,26 @@
+package org.opendaylight.controller.sal.connector.zeromq;
+
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+
+import java.io.Serializable;
+
+/**
+ * Created with IntelliJ IDEA.
+ * User: abhishk2
+ * Date: 10/24/13
+ * Time: 4:25 PM
+ * To change this template use File | Settings | File Templates.
+ */
+public class RpcReplyImpl implements RpcRouter.RpcReply<Object>,Serializable {
+
+  private Object payload;
+
+  @Override
+  public Object getPayload() {
+    return payload;  //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  public void setPayload(Object payload){
+    this.payload = payload;
+  }
+}
diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/RpcRequestImpl.java b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/RpcRequestImpl.java
new file mode 100644 (file)
index 0000000..2361ab7
--- /dev/null
@@ -0,0 +1,39 @@
+package org.opendaylight.controller.sal.connector.zeromq;
+
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+
+import java.io.Serializable;
+
+/**
+ * Created with IntelliJ IDEA.
+ * User: abhishk2
+ * Date: 10/25/13
+ * Time: 12:32 PM
+ * To change this template use File | Settings | File Templates.
+ */
+public class RpcRequestImpl implements RpcRouter.RpcRequest<QName, QName, InstanceIdentifier, Object>,Serializable {
+
+  private RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> routeIdentifier;
+  private Object payload;
+
+  @Override
+  public RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> getRoutingInformation() {
+    return routeIdentifier;
+  }
+
+  public void setRouteIdentifier(RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> routeIdentifier) {
+    this.routeIdentifier = routeIdentifier;
+  }
+
+  @Override
+  public Object getPayload() {
+    return payload;
+  }
+
+  public void setPayload(Object payload) {
+    this.payload = payload;
+  }
+
+}
diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/ZeroMqRpcRouter.java b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/ZeroMqRpcRouter.java
new file mode 100644 (file)
index 0000000..7e5efda
--- /dev/null
@@ -0,0 +1,337 @@
+package org.opendaylight.controller.sal.connector.zeromq;
+
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.zeromq.ZMQ;
+
+import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.*;
+import java.util.concurrent.*;
+
+public class ZeroMqRpcRouter implements RpcRouter<QName, QName, InstanceIdentifier, Object> {
+
+  private ExecutorService serverPool;
+  private static ExecutorService handlersPool;
+
+  private Map<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String> routingTable;
+
+  private ProviderSession brokerSession;
+
+  private ZMQ.Context context;
+  private ZMQ.Socket publisher;
+  private ZMQ.Socket subscriber;
+  private ZMQ.Socket replySocket;
+
+  private static ZeroMqRpcRouter _instance = new ZeroMqRpcRouter();
+
+  private final RpcFacade facade = new RpcFacade();
+  private final RpcListener listener = new RpcListener();
+
+  private String pubPort = System.getProperty("pub.port");//port on which announcements are sent
+  private String subPort = System.getProperty("sub.port");//other controller's pub port
+  private String pubIp = System.getProperty("pub.ip");    //other controller's ip
+  private String rpcPort = System.getProperty("rpc.port");//port on which RPC messages are received
+
+
+  private ZeroMqRpcRouter() {
+  }
+
+  public static ZeroMqRpcRouter getInstance() {
+    return _instance;
+  }
+
+  public void start() {
+    context = ZMQ.context(2);
+    serverPool = Executors.newSingleThreadExecutor();
+    handlersPool = Executors.newCachedThreadPool();
+    routingTable = new ConcurrentHashMap<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String>();
+
+    // Start listening for announce and rpc messages
+    serverPool.execute(receive());
+
+    
+    brokerSession.addRpcRegistrationListener(listener);
+    Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
+    for(QName rpc : currentlySupported) {
+        listener.onRpcImplementationAdded(rpc);
+    }
+
+
+  }
+
+  public void stop() {
+    if (handlersPool != null) handlersPool.shutdown();
+    if (serverPool != null) serverPool.shutdown();
+    if (publisher != null) {
+      publisher.setLinger(0);
+      publisher.close();
+    }
+    if (replySocket != null) {
+      replySocket.setLinger(0);
+      replySocket.close();
+    }
+    if (subscriber != null) {
+      subscriber.setLinger(0);
+      subscriber.close();
+    }
+    if (context != null) context.term();
+
+
+  }
+
+  private Runnable receive() {
+    return new Runnable() {
+      public void run() {
+        try {
+          // Bind to RPC reply socket
+          replySocket = context.socket(ZMQ.REP);
+          replySocket.bind("tcp://*:" + rpcPort);
+
+          // Bind to publishing controller
+          subscriber = context.socket(ZMQ.SUB);
+          subscriber.connect("tcp://" + pubIp + ":" + subPort);
+          System.out.println("Subscribing at[" + "tcp://" + pubIp + ":" + subPort + "]");
+
+          subscriber.subscribe(Message.serialize(Message.MessageType.ANNOUNCE));
+
+          // Initialize poll set
+          ZMQ.Poller poller = new ZMQ.Poller(2);
+          poller.register(replySocket, ZMQ.Poller.POLLIN);
+          poller.register(subscriber, ZMQ.Poller.POLLIN);
+
+          while (!Thread.currentThread().isInterrupted()) {
+
+            poller.poll(250);
+            //TODO: Fix this
+            if (poller.pollin(0)) {
+              //receive rpc request and reply
+              try {
+                Message req = parseMessage(replySocket);
+                Message resp = new Message();
+                //Call broker to process the message then reply
+                Future<RpcResult<CompositeNode>> rpc = brokerSession.rpc((QName) req.getRoute().getType(), (CompositeNode) req.getPayload());
+                RpcResult<CompositeNode> result = rpc.get();
+                resp.setType(Message.MessageType.RESPONSE);
+                resp.setSender(getLocalIpAddress() + ":" + rpcPort);
+                resp.setRoute(req.getRoute());
+                resp.setPayload(result.isSuccessful());
+                replySocket.send(Message.serialize(resp));
+
+              } catch (IOException ex) {// | ClassNotFoundException ex) {
+                System.out.println("Rpc request could not be handled" + ex);
+              }
+            }
+            if (poller.pollin(1)) {
+              //get subscription and update routing table
+              //try {
+              Message.MessageType topic = (Message.MessageType)Message.deserialize(subscriber.recv());
+              System.out.println("Topic:[" + topic + "]");
+
+              if (subscriber.hasReceiveMore()) {
+                try {
+                  Message m = (Message) Message.deserialize(subscriber.recv());
+                  System.out.println(m);
+                  //TODO: check on msg type or topic. Both should be same. Need to normalize.
+                  if (Message.MessageType.ANNOUNCE == m.getType()) updateRoutingTable(m);
+                } catch (IOException | ClassNotFoundException e) {
+                  e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                }
+              }
+//
+            }
+          }
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+        replySocket.setLinger(0);
+        replySocket.close();
+        subscriber.setLinger(0);
+        subscriber.close();
+      }
+    };
+  }
+
+  private void updateRoutingTable(Message msg) {
+    routingTable.put(msg.getRoute(), msg.getSender());
+    RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> route = msg.getRoute();
+    QName rpcType = route.getType();
+    System.out.println("Routing Table\n" + routingTable);
+
+    RpcRegistration registration = brokerSession.addRpcImplementation(rpcType, facade);
+  }
+
+  private Message parseMessage(ZMQ.Socket socket) {
+    //Message m = new Message();
+    //socket.setReceiveBufferSize(40000);
+    Message msg = null;
+    try {
+      byte[] bytes = socket.recv();
+      System.out.println("Received bytes:[" + bytes.length + "]");
+      msg = (Message) Message.deserialize(bytes);
+    } catch (Throwable t) {
+      System.out.println("Caught exception");
+      t.printStackTrace();
+    }
+    return msg;
+    /*m.setType((Message.MessageType) Message.deserialize(socket.recv()));
+
+    if (socket.hasReceiveMore()) {
+      m.setSender((String) Message.deserialize(socket.recv()));
+    }
+    if (socket.hasReceiveMore()) {
+      m.setRoute((RouteIdentifier) Message.deserialize(socket.recv()));
+    }
+    if (socket.hasReceiveMore()) {
+      m.setPayload(Message.deserialize(socket.recv()));
+    }
+    return m;*/
+  }
+
+  @Override
+  public Future<RpcReply<Object>> sendRpc(final RpcRequest<QName, QName, InstanceIdentifier, Object> input) {
+
+    return handlersPool.submit(new Callable<RpcReply<Object>>() {
+
+      @Override
+      public RpcReply<Object> call() {
+        ZMQ.Socket requestSocket = context.socket(ZMQ.REQ);
+        Message req = new Message();
+        Message resp = null;
+        RpcReplyImpl reply = new RpcReplyImpl();
+        requestSocket.connect((String) routingTable.get(input.getRoutingInformation().getRoute()));
+
+        req.setType(Message.MessageType.REQUEST);
+        req.setSender(getLocalIpAddress() + ":" + rpcPort);
+        req.setRoute(input.getRoutingInformation());
+        req.setPayload(input.getPayload());
+        try {
+          requestSocket.send(Message.serialize(req));
+          resp = parseMessage(requestSocket);
+          reply.setPayload(resp.getPayload());
+        } catch (IOException ex) {//| ClassNotFoundException ex) {
+          //Log and ignore
+          System.out.println("Error in RPC send. Input could not be serialized[" + input + "]");
+        }
+
+        return reply;
+      }
+    });
+  }
+
+  public void publish(final Message message) {
+    Runnable task = new Runnable() {
+      public void run() {
+        // Bind to publishing port
+        publisher = context.socket(ZMQ.PUB);
+        publisher.bind("tcp://*:" + pubPort);
+        System.out.println("Publisher started at port[" + pubPort + "]");
+        try {
+          Message outMessage =  new Message();
+          outMessage.setType(Message.MessageType.ANNOUNCE);
+          outMessage.setSender("tcp://" + getLocalIpAddress() + ":" + rpcPort);
+          outMessage.setRoute(message.getRoute());
+
+          System.out.println("Sending announcement[" + outMessage + "]");
+          publisher.sendMore(Message.serialize(Message.MessageType.ANNOUNCE));
+          publisher.send(Message.serialize(outMessage));
+
+        } catch (IOException ex) {
+          //Log and ignore
+          System.out.println("Error in publishing");
+          ex.printStackTrace();
+        }
+        System.out.println("Published message[" + message + "]");
+        publisher.close();
+      }
+    };
+    handlersPool.execute(task);
+  }
+
+  private String getLocalIpAddress() {
+    String hostAddress = null;
+    Enumeration e = null;
+    try {
+      e = NetworkInterface.getNetworkInterfaces();
+    } catch (SocketException e1) {
+      e1.printStackTrace();
+    }
+    while (e.hasMoreElements()) {
+
+      NetworkInterface n = (NetworkInterface) e.nextElement();
+      Enumeration ee = n.getInetAddresses();
+      while (ee.hasMoreElements()) {
+        InetAddress i = (InetAddress) ee.nextElement();
+        if ((i instanceof Inet4Address) && (i.isSiteLocalAddress()))
+          hostAddress = i.getHostAddress();
+      }
+    }
+
+    return hostAddress;
+
+
+  }
+
+
+  private class RpcFacade implements RpcImplementation {
+
+
+    @Override
+    public Set<QName> getSupportedRpcs() {
+      return Collections.emptySet();
+    }
+
+    @Override
+    public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+
+      RpcRequestImpl request = new RpcRequestImpl();
+      RouteIdentifierImpl routeId = new RouteIdentifierImpl();
+      routeId.setContext(null);
+      routeId.setRoute(null);
+      routeId.setType(rpc);
+
+      request.setRouteIdentifier(routeId);
+      request.setPayload(input);
+      // Create message
+
+      Future<org.opendaylight.controller.sal.connector.api.RpcRouter.RpcReply<Object>> ret = sendRpc(request);
+
+      return null;
+    }
+  }
+
+  private class RpcListener implements RpcRegistrationListener {
+
+    @Override
+    public void onRpcImplementationAdded(QName name) {
+
+      Message msg = new Message();
+      RouteIdentifierImpl routeId = new RouteIdentifierImpl();
+      routeId.setType(name);
+      msg.setRoute(routeId);
+      publish(msg);
+    }
+
+    @Override
+    public void onRpcImplementationRemoved(QName name) {
+      // TODO Auto-generated method stub
+
+    }
+  }
+
+  public void setBrokerSession(ProviderSession session) {
+    this.brokerSession = session;
+
+  }
+
+}
diff --git a/opendaylight/md-sal/test/pom.xml b/opendaylight/md-sal/test/pom.xml
new file mode 100644 (file)
index 0000000..f9e500e
--- /dev/null
@@ -0,0 +1,24 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+       <modelVersion>4.0.0</modelVersion>
+       <parent>
+               <artifactId>sal-parent</artifactId>
+               <version>1.0-SNAPSHOT</version>
+               <groupId>org.opendaylight.controller</groupId>
+       </parent>
+       <packaging>pom</packaging>
+       <groupId>org.opendaylight.controller.tests</groupId>
+       <artifactId>sal-test-parent</artifactId>
+    <scm>
+      <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
+      <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
+      <url>https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL</url>
+    </scm>
+
+       <modules>
+               <module>zeromq-test-consumer</module>
+        <module>zeromq-test-it</module>
+        <module>zeromq-test-provider</module>
+       </modules>
+       
+</project>
diff --git a/opendaylight/md-sal/test/zeromq-test-consumer/pom.xml b/opendaylight/md-sal/test/zeromq-test-consumer/pom.xml
new file mode 100644 (file)
index 0000000..7c6bc21
--- /dev/null
@@ -0,0 +1,85 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>sal-test-parent</artifactId>
+        <groupId>org.opendaylight.controller.tests</groupId>
+        <version>1.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>zeromq-test-consumer</artifactId>
+    <packaging>bundle</packaging>
+    <scm>
+        <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
+        <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
+        <url>https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL</url>
+    </scm>
+
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <configuration>
+                    <instructions>
+                        <Bundle-Activator>org.opendaylight.controller.sample.zeromq.consumer.ExampleConsumer</Bundle-Activator>
+                        <Import-Package>
+                            org.opendaylight.controller.sal.core.api,
+                            org.opendaylight.yangtools.yang.common;version="[0.5,1)",
+                            org.opendaylight.yangtools.yang.data.api,
+                        </Import-Package>
+                    </instructions>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-binding-api</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-common-util</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-core-api</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+
+
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>containermanager</artifactId>
+            <version>0.5.1-SNAPSHOT</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal</artifactId>
+            <version>0.5.1-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.yangtools</groupId>
+            <artifactId>yang-binding</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.yangtools</groupId>
+            <artifactId>yang-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.yangtools</groupId>
+            <artifactId>yang-data-api</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-common-util</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/opendaylight/md-sal/test/zeromq-test-consumer/src/main/java/org/opendaylight/controller/sample/zeromq/consumer/ExampleConsumer.java b/opendaylight/md-sal/test/zeromq-test-consumer/src/main/java/org/opendaylight/controller/sample/zeromq/consumer/ExampleConsumer.java
new file mode 100644 (file)
index 0000000..a56a7de
--- /dev/null
@@ -0,0 +1,51 @@
+package org.opendaylight.controller.sample.zeromq.consumer;
+
+import java.net.URI;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.opendaylight.controller.sal.core.api.AbstractConsumer;
+import org.opendaylight.controller.sal.core.api.Broker.ConsumerSession;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.osgi.framework.BundleContext;
+
+public class ExampleConsumer extends AbstractConsumer {
+
+    private final URI namespace = URI.create("http://cisco.com/example");
+    private final QName QNAME = new QName(namespace,"heartbeat");
+    
+    ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+    private ConsumerSession session;
+    
+    
+    @Override
+    public void onSessionInitiated(ConsumerSession session) {
+        this.session = session;
+        executor.scheduleAtFixedRate(new Runnable() {
+            
+            @Override
+            public void run() {
+                int count = 0;
+                try {
+                    Future<RpcResult<CompositeNode>> future = ExampleConsumer.this.session.rpc(QNAME, null);
+                    RpcResult<CompositeNode> result = future.get();
+                    System.out.println("Result received. Status is :" + result.isSuccessful());
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+                
+            }
+        }, 0, 10, TimeUnit.SECONDS);
+    }
+    
+    @Override
+    protected void stopImpl(BundleContext context) {
+        // TODO Auto-generated method stub
+        super.stopImpl(context);
+        executor.shutdown();
+    }
+}
diff --git a/opendaylight/md-sal/test/zeromq-test-it/pom.xml b/opendaylight/md-sal/test/zeromq-test-it/pom.xml
new file mode 100644 (file)
index 0000000..56945d1
--- /dev/null
@@ -0,0 +1,184 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>sal-test-parent</artifactId>
+        <groupId>org.opendaylight.controller.tests</groupId>
+        <version>1.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>zeromq-test-it</artifactId>
+    <scm>
+        <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
+        <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
+        <url>https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL</url>
+    </scm>
+
+    <properties>
+        <exam.version>3.0.0</exam.version>
+        <url.version>1.5.0</url.version>
+    </properties>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.ops4j.pax.exam</groupId>
+                <artifactId>maven-paxexam-plugin</artifactId>
+                <version>1.2.4</version>
+                <executions>
+                    <execution>
+                        <id>generate-config</id>
+                        <goals>
+                            <goal>generate-depends-file</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+        <pluginManagement>
+            <plugins>
+                <!--This plugin's configuration is used to store Eclipse m2e settings 
+                    only. It has no influence on the Maven build itself. -->
+                <plugin>
+                    <groupId>org.eclipse.m2e</groupId>
+                    <artifactId>lifecycle-mapping</artifactId>
+                    <version>1.0.0</version>
+                    <configuration>
+                        <lifecycleMappingMetadata>
+                            <pluginExecutions>
+                                <pluginExecution>
+                                    <pluginExecutionFilter>
+                                        <groupId>
+                                            org.ops4j.pax.exam
+                                        </groupId>
+                                        <artifactId>
+                                            maven-paxexam-plugin
+                                        </artifactId>
+                                        <versionRange>
+                                            [1.2.4,)
+                                        </versionRange>
+                                        <goals>
+                                            <goal>
+                                                generate-depends-file
+                                            </goal>
+                                        </goals>
+                                    </pluginExecutionFilter>
+                                    <action>
+                                        <ignore></ignore>
+                                    </action>
+                                </pluginExecution>
+                            </pluginExecutions>
+                        </lifecycleMappingMetadata>
+                    </configuration>
+                </plugin>
+            </plugins>
+        </pluginManagement>
+    </build>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.opendaylight.yangtools.thirdparty</groupId>
+            <artifactId>xtend-lib-osgi</artifactId>
+            <version>2.4.3</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller.tests</groupId>
+            <artifactId>zeromq-test-provider</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller.tests</groupId>
+            <artifactId>zeromq-test-consumer</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-broker-impl</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.ops4j.pax.exam</groupId>
+            <artifactId>pax-exam-container-native</artifactId>
+            <version>${exam.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.ops4j.pax.exam</groupId>
+            <artifactId>pax-exam-junit4</artifactId>
+            <version>${exam.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.ops4j.pax.exam</groupId>
+            <artifactId>pax-exam-link-mvn</artifactId>
+            <version>${exam.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>equinoxSDK381</groupId>
+            <artifactId>org.eclipse.osgi</artifactId>
+            <version>3.8.1.v20120830-144521</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>log4j-over-slf4j</artifactId>
+            <version>1.7.2</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+            <version>1.0.9</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>1.0.9</version>
+        </dependency>
+      <dependency>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>sal-binding-api</artifactId>
+        <version>1.0-SNAPSHOT</version>
+      </dependency>
+      <dependency>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>sal-common-util</artifactId>
+        <version>1.0-SNAPSHOT</version>
+      </dependency>
+      <dependency>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>sal-core-api</artifactId>
+        <version>1.0-SNAPSHOT</version>
+      </dependency>
+
+
+      <dependency>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>containermanager</artifactId>
+        <version>0.5.1-SNAPSHOT</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>sal</artifactId>
+        <version>0.5.1-SNAPSHOT</version>
+      </dependency>
+      <dependency>
+        <groupId>org.opendaylight.yangtools</groupId>
+        <artifactId>yang-binding</artifactId>
+      </dependency>
+      <dependency>
+        <groupId>org.opendaylight.yangtools</groupId>
+        <artifactId>yang-common</artifactId>
+      </dependency>
+      <dependency>
+        <groupId>org.opendaylight.yangtools</groupId>
+        <artifactId>yang-data-api</artifactId>
+      </dependency>
+
+      <dependency>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>sal-common-util</artifactId>
+        <version>1.0-SNAPSHOT</version>
+      </dependency>
+    </dependencies>
+</project>
diff --git a/opendaylight/md-sal/test/zeromq-test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/ServiceConsumerController.java b/opendaylight/md-sal/test/zeromq-test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/ServiceConsumerController.java
new file mode 100644 (file)
index 0000000..c17b143
--- /dev/null
@@ -0,0 +1,75 @@
+package org.opendaylight.controller.sample.zeromq.test.it;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.Configuration;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.junit.PaxExam;
+import org.osgi.framework.BundleContext;
+
+import javax.inject.Inject;
+
+import static org.junit.Assert.assertTrue;
+import static org.ops4j.pax.exam.CoreOptions.*;
+
+@RunWith(PaxExam.class)
+public class ServiceConsumerController {
+
+    public static final String ODL = "org.opendaylight.controller";
+    public static final String YANG = "org.opendaylight.yangtools";
+    public static final String SAMPLE = "org.opendaylight.controller.samples";
+
+    @Test
+    public void properInitialized() throws Exception {
+
+        Thread.sleep(30000); // Waiting for services to get wired.
+        assertTrue(true);
+        //assertTrue(consumer.createToast(WhiteBread.class, 5));
+
+    }
+
+//    @Inject
+//    BindingAwareBroker broker;
+
+//    @Inject
+//    ToastConsumer consumer;
+
+    @Inject
+    BundleContext ctx;
+
+    @Configuration
+    public Option[] config() {
+        return options(systemProperty("osgi.console").value("2401"),
+                systemProperty("pub.port").value("5557"),
+                systemProperty("sub.port").value("5556"),
+                systemProperty("rpc.port").value("5555"),
+                systemProperty("pub.ip").value("localhost"),
+                mavenBundle("org.slf4j", "slf4j-api").versionAsInProject(), //
+                mavenBundle("org.slf4j", "log4j-over-slf4j").versionAsInProject(), //
+                mavenBundle("ch.qos.logback", "logback-core").versionAsInProject(), //
+                mavenBundle("ch.qos.logback", "logback-classic").versionAsInProject(), //
+               
+                //mavenBundle(ODL, "sal-binding-broker-impl").versionAsInProject().update(), //
+                mavenBundle(ODL, "sal-common").versionAsInProject(), //
+                mavenBundle(ODL, "sal-common-api").versionAsInProject(),//
+                mavenBundle(ODL, "sal-common-impl").versionAsInProject(), //
+                mavenBundle(ODL, "sal-common-util").versionAsInProject(), //
+                mavenBundle(ODL, "sal-core-api").versionAsInProject().update(), //
+                mavenBundle(ODL, "sal-broker-impl").versionAsInProject(), //
+                mavenBundle(ODL, "sal-core-spi").versionAsInProject().update(), //
+                mavenBundle(ODL, "sal-connector-api").versionAsInProject(), //
+                mavenBundle(SAMPLE, "zeromq-test-consumer").versionAsInProject(), //
+                mavenBundle(ODL, "sal-zeromq-connector").versionAsInProject(), //
+                mavenBundle(YANG, "concepts").versionAsInProject(),
+                mavenBundle(YANG, "yang-binding").versionAsInProject(), //
+                mavenBundle(YANG, "yang-common").versionAsInProject(), //
+                mavenBundle(YANG, "yang-data-api").versionAsInProject(), //
+                mavenBundle(YANG, "yang-model-api").versionAsInProject(), //
+                mavenBundle(YANG+".thirdparty", "xtend-lib-osgi").versionAsInProject(), //
+                mavenBundle("com.google.guava", "guava").versionAsInProject(), //
+                mavenBundle("org.jeromq", "jeromq").versionAsInProject(),
+                junitBundles()
+                );
+    }
+
+}
diff --git a/opendaylight/md-sal/test/zeromq-test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/ServiceProviderController.java b/opendaylight/md-sal/test/zeromq-test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/ServiceProviderController.java
new file mode 100644 (file)
index 0000000..2d28b0b
--- /dev/null
@@ -0,0 +1,86 @@
+package org.opendaylight.controller.sample.zeromq.test.it;
+
+import static org.junit.Assert.*;
+import static org.ops4j.pax.exam.CoreOptions.junitBundles;
+import static org.ops4j.pax.exam.CoreOptions.mavenBundle;
+import static org.ops4j.pax.exam.CoreOptions.options;
+import static org.ops4j.pax.exam.CoreOptions.systemPackages;
+import static org.ops4j.pax.exam.CoreOptions.systemProperty;
+import static org.ops4j.pax.exam.CoreOptions.maven;
+
+import java.util.Collection;
+
+import javax.inject.Inject;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.ops4j.pax.exam.Configuration;
+import org.ops4j.pax.exam.CoreOptions;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.junit.PaxExam;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceReference;
+
+@RunWith(PaxExam.class)
+public class ServiceProviderController {
+
+    public static final String ODL = "org.opendaylight.controller";
+    public static final String YANG = "org.opendaylight.yangtools";
+    public static final String SAMPLE = "org.opendaylight.controller.samples";
+
+    @Test
+    public void properInitialized() throws Exception {
+
+        Thread.sleep(30000); // Waiting for services to get wired.
+        assertTrue(true);
+        //assertTrue(consumer.createToast(WhiteBread.class, 5));
+
+    }
+
+//    @Inject
+//    BindingAwareBroker broker;
+
+//    @Inject
+//    ToastConsumer consumer;
+
+    @Inject
+    BundleContext ctx;
+
+    @Configuration
+    public Option[] config() {
+        return options(systemProperty("osgi.console").value("2401"),
+                systemProperty("pub.port").value("5556"),
+                systemProperty("sub.port").value("5557"),
+                systemProperty("rpc.port").value("5554"),
+                systemProperty("pub.ip").value("localhost"),
+                mavenBundle("org.slf4j", "slf4j-api").versionAsInProject(), //
+                mavenBundle("org.slf4j", "log4j-over-slf4j").versionAsInProject(), //
+                mavenBundle("ch.qos.logback", "logback-core").versionAsInProject(), //
+                mavenBundle("ch.qos.logback", "logback-classic").versionAsInProject(), //
+               
+                //mavenBundle(ODL, "sal-binding-broker-impl").versionAsInProject().update(), //
+                mavenBundle(ODL, "sal-common").versionAsInProject(), //
+                mavenBundle(ODL, "sal-common-api").versionAsInProject(),//
+                mavenBundle(ODL, "sal-common-impl").versionAsInProject(), //
+                mavenBundle(ODL, "sal-common-util").versionAsInProject(), //
+                mavenBundle(ODL, "sal-core-api").versionAsInProject().update(), //
+                mavenBundle(ODL, "sal-broker-impl").versionAsInProject(), //
+                mavenBundle(ODL, "sal-core-spi").versionAsInProject().update(), //
+                mavenBundle(ODL, "sal-connector-api").versionAsInProject(), //
+                mavenBundle(SAMPLE, "zeromq-test-provider").versionAsInProject(), //
+                mavenBundle(ODL, "sal-zeromq-connector").versionAsInProject(), //
+                mavenBundle(YANG, "concepts").versionAsInProject(),
+                mavenBundle(YANG, "yang-binding").versionAsInProject(), //
+                mavenBundle(YANG, "yang-common").versionAsInProject(), //
+                mavenBundle(YANG, "yang-data-api").versionAsInProject(), //
+                mavenBundle(YANG, "yang-model-api").versionAsInProject(), //
+                mavenBundle(YANG+".thirdparty", "xtend-lib-osgi").versionAsInProject(), //
+                mavenBundle("com.google.guava", "guava").versionAsInProject(), //
+                mavenBundle("org.jeromq", "jeromq").versionAsInProject(),
+                junitBundles()
+                );
+    }
+
+}
diff --git a/opendaylight/md-sal/test/zeromq-test-provider/pom.xml b/opendaylight/md-sal/test/zeromq-test-provider/pom.xml
new file mode 100644 (file)
index 0000000..10e15aa
--- /dev/null
@@ -0,0 +1,86 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+        <artifactId>sal-test-parent</artifactId>
+        <groupId>org.opendaylight.controller.tests</groupId>
+        <version>1.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>zeromq-test-provider</artifactId>
+  <packaging>bundle</packaging>
+  <scm>
+    <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
+    <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
+    <url>https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL</url>
+  </scm>
+
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.felix</groupId>
+        <artifactId>maven-bundle-plugin</artifactId>
+        <configuration>
+          <instructions>
+            <Bundle-Activator>org.opendaylight.controller.sample.zeromq.provider.ExampleProvider</Bundle-Activator>
+          </instructions>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-binding-api</artifactId>
+      <version>1.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-common-util</artifactId>
+      <version>1.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-core-api</artifactId>
+      <version>1.0-SNAPSHOT</version>
+    </dependency>
+
+
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>containermanager</artifactId>
+      <version>0.5.1-SNAPSHOT</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal</artifactId>
+      <version>0.5.1-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>yang-binding</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>yang-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>yang-data-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-common-util</artifactId>
+      <version>1.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-zeromq-connector</artifactId>
+      <version>1.0-SNAPSHOT</version>
+    </dependency>
+
+  </dependencies>
+</project>
diff --git a/opendaylight/md-sal/test/zeromq-test-provider/src/main/java/org/opendaylight/controller/sample/zeromq/provider/ExampleProvider.java b/opendaylight/md-sal/test/zeromq-test-provider/src/main/java/org/opendaylight/controller/sample/zeromq/provider/ExampleProvider.java
new file mode 100644 (file)
index 0000000..ec7d7a8
--- /dev/null
@@ -0,0 +1,67 @@
+package org.opendaylight.controller.sample.zeromq.provider;
+
+import java.net.URI;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+
+import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.controller.sal.core.api.AbstractProvider;
+import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.osgi.framework.BundleContext;
+
+public class ExampleProvider extends AbstractProvider implements RpcImplementation  {
+
+    private final URI namespace = URI.create("http://cisco.com/example");
+    private final QName QNAME = new QName(namespace,"heartbeat");
+    private RpcRegistration reg;
+    
+    
+    @Override
+    public void onSessionInitiated(ProviderSession session) {
+      //Adding heartbeat 10 times just to make sure subscriber get it
+      for (int i=0;i<10;i++){
+        System.out.println("ExampleProvider: Adding " + QNAME + " " + i);
+        reg = session.addRpcImplementation(QNAME, this);
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+        }
+      }
+    }
+    
+    @Override
+    public Set<QName> getSupportedRpcs() {
+        return Collections.singleton(QNAME);
+    }
+    
+    @Override
+    public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+        if(QNAME.equals(rpc)) {
+            RpcResult<CompositeNode> output = Rpcs.getRpcResult(true, null, Collections.<RpcError>emptySet());
+            return output;
+        }
+        RpcResult<CompositeNode> output = Rpcs.getRpcResult(false, null, Collections.<RpcError>emptySet());
+        return output;
+    }
+    
+    @Override
+    protected void stopImpl(BundleContext context) {
+     if(reg != null) {
+         try {
+            reg.close();
+        } catch (Exception e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+     }
+    }
+
+}