<module>clustered-data-store/implementation</module>
<module>clustered-data-store/integrationtest</module>
+ <module>sal-zeromq-connector</module>
+ <module>test</module>
</modules>
<properties>
<artifactId>jacoco-maven-plugin</artifactId>
<version>${jacoco.version}</version>
</plugin>
- <!--This plugin's configuration is used to store Eclipse
- m2e settings only. It has no influence on the Maven build itself. -->
+ <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<ignore></ignore>
</action>
</pluginExecution>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.jacoco</groupId>
+ <artifactId>
+ jacoco-maven-plugin
+ </artifactId>
+ <versionRange>
+ [0.5.3.201107060350,)
+ </versionRange>
+ <goals>
+ <goal>prepare-agent</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore></ignore>
+ </action>
+ </pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
* @param <T> Rpc Type
* @param <D> Data Type
*/
-public interface RpcRouter<C,R,T,D> {
+public interface RpcRouter<C,T,R,D> {
- Future<RpcReply<D>> sendRpc(RpcRequest<C, R, T, D> input);
+ Future<RpcReply<D>> sendRpc(RpcRequest<C, T, R, D> input);
/**
* @param <T> Rpc Type
* @param <D> Data Type
*/
- public interface RpcRequest<C,R,T,D> {
+ public interface RpcRequest<C,T,R,D> {
- RouteIdentifier<C,R,T> getRoutingInformation();
+ RouteIdentifier<C,T,R> getRoutingInformation();
D getPayload();
}
- public interface RouteIdentifier<C,R,T> {
+ public interface RouteIdentifier<C,T,R> {
C getContext(); // defines a routing table (e.g. NodeContext)
- R getRoute(); // e.g. (node identity)
T getType(); // rpc type
+ R getRoute(); // e.g. (node identity)
}
public interface RpcReply<D> {
}
override addRpcImplementation(QName rpcType, RpcImplementation implementation) throws IllegalArgumentException {
- if(rpcType == null) {
+ if (rpcType == null) {
throw new IllegalArgumentException("rpcType must not be null");
}
- if(implementation == null) {
+ if (implementation == null) {
throw new IllegalArgumentException("Implementation must not be null");
}
broker.addRpcImplementation(rpcType, implementation);
rpcImpls.put(rpcType, implementation);
-
+
return new RpcRegistrationImpl(rpcType, implementation, this);
}
def removeRpcImplementation(RpcRegistrationImpl implToRemove) throws IllegalArgumentException {
val localImpl = rpcImpls.get(implToRemove.type);
- if(localImpl !== implToRemove.instance) {
- throw new IllegalStateException(
- "Implementation was not registered in this session");
+ if (localImpl !== implToRemove.instance) {
+ throw new IllegalStateException("Implementation was not registered in this session");
}
- broker.removeRpcImplementation(implToRemove.type,localImpl);
+ broker.removeRpcImplementation(implToRemove.type, localImpl);
rpcImpls.remove(implToRemove.type);
}
-
+
override close() {
- removeAllRpcImlementations
- super.close
+ removeAllRpcImlementations
+ super.close
}
-
+
private def removeAllRpcImlementations() {
- if (!rpcImpls.empty) {
- for (entry : rpcImpls.entrySet) {
- broker.removeRpcImplementation(entry.key,entry.value);
- }
- rpcImpls.clear
- }
+ if (!rpcImpls.empty) {
+ for (entry : rpcImpls.entrySet) {
+ broker.removeRpcImplementation(entry.key, entry.value);
+ }
+ rpcImpls.clear
+ }
}
-
+
override addMountedRpcImplementation(QName rpcType, RpcImplementation implementation) {
throw new UnsupportedOperationException("TODO: auto-generated method stub")
}
-
+
override addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
throw new UnsupportedOperationException("TODO: auto-generated method stub")
}
-
+
override getSupportedRpcs() {
broker.getSupportedRpcs();
}
-
+
override addRpcRegistrationListener(RpcRegistrationListener listener) {
broker.addRpcRegistrationListener(listener);
}
}
class RpcRegistrationImpl extends AbstractObjectRegistration<RpcImplementation> implements RpcRegistration {
-
- @Property
- val QName type
-
- private var ProviderContextImpl context
-
- new(QName type, RpcImplementation instance, ProviderContextImpl ctx) {
- super(instance)
- _type = type
- context = ctx
- }
-
- override protected removeRegistration() {
- context.removeRpcImplementation(this)
- context = null
- }
+
+ @Property
+ val QName type
+
+ private var ProviderContextImpl context
+
+ new(QName type, RpcImplementation instance, ProviderContextImpl ctx) {
+ super(instance)
+ _type = type
+ context = ctx
+ }
+
+ override protected removeRegistration() {
+ context.removeRpcImplementation(this)
+ context = null
+ }
}
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-parent</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </parent>
+
+
+ <artifactId>sal-zeromq-connector</artifactId>
+ <packaging>bundle</packaging>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <version>${bundle.plugin.version}</version>
+ <extensions>true</extensions>
+ <configuration>
+ <instructions>
+ <Import-Package>
+ org.opendaylight.controller.sal.connector.api,
+ org.opendaylight.controller.sal.core.api,
+ org.opendaylight.yangtools.concepts;version="[0.1,1)",
+ org.opendaylight.yangtools.yang.common;version="[0.5,1)",
+ org.opendaylight.yangtools.yang.data.api;version="[0.5,1)",
+ org.zeromq;version="[0.3,1)"
+ </Import-Package>
+ <Bundle-Activator>org.opendaylight.controller.sal.connector.zeromq.Activator</Bundle-Activator>
+ </instructions>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>containermanager</artifactId>
+ <version>0.5.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>commons.northbound</artifactId>
+ <version>0.4.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal</artifactId>
+ <version>0.5.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-binding</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-connector-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-common-util</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jeromq</groupId>
+ <artifactId>jeromq</artifactId>
+ <version>0.3.0-SNAPSHOT</version>
+ </dependency>
+
+ </dependencies>
+ <repositories>
+ <repository>
+ <id>sonatype-nexus-snapshots</id>
+ <url>https://oss.sonatype.org/content/repositories/snapshots</url>
+ <releases>
+ <enabled>false</enabled>
+ </releases>
+ <snapshots>
+ <enabled>true</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
+</project>
--- /dev/null
+package org.opendaylight.controller.sal.connector.zeromq;
+
+import org.opendaylight.controller.sal.core.api.AbstractProvider;
+import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.osgi.framework.BundleContext;
+
+public class Activator extends AbstractProvider {
+
+ ZeroMqRpcRouter router;
+
+ @Override
+ public void onSessionInitiated(ProviderSession session) {
+ router = ZeroMqRpcRouter.getInstance();
+ router.setBrokerSession(session);
+ router.start();
+ }
+
+ @Override
+ protected void stopImpl(BundleContext context) {
+ router.stop();
+ }
+
+}
--- /dev/null
+package org.opendaylight.controller.sal.connector.zeromq;
+
+
+import org.codehaus.jackson.map.ObjectMapper;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+
+import java.io.*;
+import java.util.Arrays;
+
+public class Message implements Serializable {
+
+ public enum MessageType {
+ ANNOUNCE((byte) 0),
+ HEARTBEAT((byte) 1),
+ REQUEST((byte) 2),
+ RESPONSE((byte) 3);
+
+ private final byte type;
+
+ MessageType(byte type) {
+ this.type = type;
+ }
+
+ public byte getType(){
+ return this.type;
+ }
+ }
+
+ private MessageType type;
+ private String sender;
+ private RpcRouter.RouteIdentifier route;
+ private Object payload;
+
+ public MessageType getType() {
+ return type;
+ }
+
+ public void setType(MessageType type) {
+ this.type = type;
+ }
+
+ public String getSender() {
+ return sender;
+ }
+
+ public void setSender(String sender) {
+ this.sender = sender;
+ }
+
+ public RpcRouter.RouteIdentifier getRoute() {
+ return route;
+ }
+
+ public void setRoute(RpcRouter.RouteIdentifier route) {
+ this.route = route;
+ }
+
+ public Object getPayload() {
+ return payload;
+ }
+
+ public void setPayload(Object payload) {
+ this.payload = payload;
+ }
+
+ @Override
+ public String toString() {
+ return "Message{" +
+ "type=" + type +
+ ", sender='" + sender + '\'' +
+ ", route=" + route +
+ ", payload=" + payload +
+ '}';
+ }
+
+ /**
+ * Converts any {@link Serializable} object to byte[]
+ *
+ * @param obj
+ * @return
+ * @throws IOException
+ */
+ public static byte[] serialize(Object obj) throws IOException {
+ ByteArrayOutputStream b = new ByteArrayOutputStream();
+ ObjectOutputStream o = new ObjectOutputStream(b);
+ o.writeObject(obj);
+ return b.toByteArray();
+ }
+
+ /**
+ * Converts byte[] to a java object
+ *
+ * @param bytes
+ * @return
+ * @throws IOException
+ * @throws ClassNotFoundException
+ */
+ public static Object deserialize(byte[] bytes) throws IOException, ClassNotFoundException {
+ ByteArrayInputStream b = new ByteArrayInputStream(bytes);
+ ObjectInputStream o = new ObjectInputStream(b);
+ return o.readObject();
+ }
+
+ public static byte[] toJsonBytes(Message m){
+ ObjectMapper o = new ObjectMapper();
+ try {
+ System.out.println(o.writeValueAsString(m));
+ return o.writeValueAsBytes(m);
+ } catch (IOException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ return null;
+ }
+
+ public static Message fromJsonBytes(byte [] bytes){
+
+ ObjectMapper o = new ObjectMapper();
+ Message m = null;
+ try {
+ m = o.readValue(bytes, Message.class);
+ } catch (IOException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ return m;
+ }
+
+ public static class Response extends Message implements RpcRouter.RpcReply {
+ private ResponseCode code; // response code
+
+ public static enum ResponseCode {
+ SUCCESS(200), BADREQUEST(400), TIMEOUT(408), GONE(410), SERVERERROR(500), SERVICEUNAVAILABLE(503);
+
+ private int code;
+
+ ResponseCode(int code) {
+ this.code = code;
+ }
+ }
+
+ public ResponseCode getCode() {
+ return code;
+ }
+
+ public void setCode(ResponseCode code) {
+ this.code = code;
+ }
+ }
+
+}
+
--- /dev/null
+package org.opendaylight.controller.sal.connector.zeromq;
+
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+
+import java.io.Serializable;
+
+/**
+ * User: abhishk2
+ */
+public class RouteIdentifierImpl implements RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>,Serializable {
+
+ private QName context;
+ private QName type;
+ private InstanceIdentifier route;
+
+ @Override
+ public QName getContext() {
+ return this.context;
+ }
+
+ @Override
+ public QName getType() {
+ return this.type;
+ }
+
+ @Override
+ public InstanceIdentifier getRoute() {
+ return this.route;
+ }
+
+ public void setContext(QName context) {
+ this.context = context;
+ }
+
+ public void setType(QName type) {
+ this.type = type;
+ }
+
+ public void setRoute(InstanceIdentifier route) {
+ this.route = route;
+ }
+
+ @Override
+ public String toString() {
+ return "RouteIdentifierImpl{" +
+ "context=" + context +
+ ", type=" + type +
+ ", route=" + route +
+ '}';
+ }
+}
--- /dev/null
+package org.opendaylight.controller.sal.connector.zeromq;
+
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+
+import java.io.Serializable;
+
+/**
+ * Created with IntelliJ IDEA.
+ * User: abhishk2
+ * Date: 10/24/13
+ * Time: 4:25 PM
+ * To change this template use File | Settings | File Templates.
+ */
+public class RpcReplyImpl implements RpcRouter.RpcReply<Object>,Serializable {
+
+ private Object payload;
+
+ @Override
+ public Object getPayload() {
+ return payload; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setPayload(Object payload){
+ this.payload = payload;
+ }
+}
--- /dev/null
+package org.opendaylight.controller.sal.connector.zeromq;
+
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+
+import java.io.Serializable;
+
+/**
+ * Created with IntelliJ IDEA.
+ * User: abhishk2
+ * Date: 10/25/13
+ * Time: 12:32 PM
+ * To change this template use File | Settings | File Templates.
+ */
+public class RpcRequestImpl implements RpcRouter.RpcRequest<QName, QName, InstanceIdentifier, Object>,Serializable {
+
+ private RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> routeIdentifier;
+ private Object payload;
+
+ @Override
+ public RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> getRoutingInformation() {
+ return routeIdentifier;
+ }
+
+ public void setRouteIdentifier(RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> routeIdentifier) {
+ this.routeIdentifier = routeIdentifier;
+ }
+
+ @Override
+ public Object getPayload() {
+ return payload;
+ }
+
+ public void setPayload(Object payload) {
+ this.payload = payload;
+ }
+
+}
--- /dev/null
+package org.opendaylight.controller.sal.connector.zeromq;
+
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.zeromq.ZMQ;
+
+import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.*;
+import java.util.concurrent.*;
+
+public class ZeroMqRpcRouter implements RpcRouter<QName, QName, InstanceIdentifier, Object> {
+
+ private ExecutorService serverPool;
+ private static ExecutorService handlersPool;
+
+ private Map<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String> routingTable;
+
+ private ProviderSession brokerSession;
+
+ private ZMQ.Context context;
+ private ZMQ.Socket publisher;
+ private ZMQ.Socket subscriber;
+ private ZMQ.Socket replySocket;
+
+ private static ZeroMqRpcRouter _instance = new ZeroMqRpcRouter();
+
+ private final RpcFacade facade = new RpcFacade();
+ private final RpcListener listener = new RpcListener();
+
+ private String pubPort = System.getProperty("pub.port");//port on which announcements are sent
+ private String subPort = System.getProperty("sub.port");//other controller's pub port
+ private String pubIp = System.getProperty("pub.ip"); //other controller's ip
+ private String rpcPort = System.getProperty("rpc.port");//port on which RPC messages are received
+
+
+ private ZeroMqRpcRouter() {
+ }
+
+ public static ZeroMqRpcRouter getInstance() {
+ return _instance;
+ }
+
+ public void start() {
+ context = ZMQ.context(2);
+ serverPool = Executors.newSingleThreadExecutor();
+ handlersPool = Executors.newCachedThreadPool();
+ routingTable = new ConcurrentHashMap<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String>();
+
+ // Start listening for announce and rpc messages
+ serverPool.execute(receive());
+
+
+ brokerSession.addRpcRegistrationListener(listener);
+ Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
+ for(QName rpc : currentlySupported) {
+ listener.onRpcImplementationAdded(rpc);
+ }
+
+
+ }
+
+ public void stop() {
+ if (handlersPool != null) handlersPool.shutdown();
+ if (serverPool != null) serverPool.shutdown();
+ if (publisher != null) {
+ publisher.setLinger(0);
+ publisher.close();
+ }
+ if (replySocket != null) {
+ replySocket.setLinger(0);
+ replySocket.close();
+ }
+ if (subscriber != null) {
+ subscriber.setLinger(0);
+ subscriber.close();
+ }
+ if (context != null) context.term();
+
+
+ }
+
+ private Runnable receive() {
+ return new Runnable() {
+ public void run() {
+ try {
+ // Bind to RPC reply socket
+ replySocket = context.socket(ZMQ.REP);
+ replySocket.bind("tcp://*:" + rpcPort);
+
+ // Bind to publishing controller
+ subscriber = context.socket(ZMQ.SUB);
+ subscriber.connect("tcp://" + pubIp + ":" + subPort);
+ System.out.println("Subscribing at[" + "tcp://" + pubIp + ":" + subPort + "]");
+
+ subscriber.subscribe(Message.serialize(Message.MessageType.ANNOUNCE));
+
+ // Initialize poll set
+ ZMQ.Poller poller = new ZMQ.Poller(2);
+ poller.register(replySocket, ZMQ.Poller.POLLIN);
+ poller.register(subscriber, ZMQ.Poller.POLLIN);
+
+ while (!Thread.currentThread().isInterrupted()) {
+
+ poller.poll(250);
+ //TODO: Fix this
+ if (poller.pollin(0)) {
+ //receive rpc request and reply
+ try {
+ Message req = parseMessage(replySocket);
+ Message resp = new Message();
+ //Call broker to process the message then reply
+ Future<RpcResult<CompositeNode>> rpc = brokerSession.rpc((QName) req.getRoute().getType(), (CompositeNode) req.getPayload());
+ RpcResult<CompositeNode> result = rpc.get();
+ resp.setType(Message.MessageType.RESPONSE);
+ resp.setSender(getLocalIpAddress() + ":" + rpcPort);
+ resp.setRoute(req.getRoute());
+ resp.setPayload(result.isSuccessful());
+ replySocket.send(Message.serialize(resp));
+
+ } catch (IOException ex) {// | ClassNotFoundException ex) {
+ System.out.println("Rpc request could not be handled" + ex);
+ }
+ }
+ if (poller.pollin(1)) {
+ //get subscription and update routing table
+ //try {
+ Message.MessageType topic = (Message.MessageType)Message.deserialize(subscriber.recv());
+ System.out.println("Topic:[" + topic + "]");
+
+ if (subscriber.hasReceiveMore()) {
+ try {
+ Message m = (Message) Message.deserialize(subscriber.recv());
+ System.out.println(m);
+ //TODO: check on msg type or topic. Both should be same. Need to normalize.
+ if (Message.MessageType.ANNOUNCE == m.getType()) updateRoutingTable(m);
+ } catch (IOException | ClassNotFoundException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+//
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ replySocket.setLinger(0);
+ replySocket.close();
+ subscriber.setLinger(0);
+ subscriber.close();
+ }
+ };
+ }
+
+ private void updateRoutingTable(Message msg) {
+ routingTable.put(msg.getRoute(), msg.getSender());
+ RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> route = msg.getRoute();
+ QName rpcType = route.getType();
+ System.out.println("Routing Table\n" + routingTable);
+
+ RpcRegistration registration = brokerSession.addRpcImplementation(rpcType, facade);
+ }
+
+ private Message parseMessage(ZMQ.Socket socket) {
+ //Message m = new Message();
+ //socket.setReceiveBufferSize(40000);
+ Message msg = null;
+ try {
+ byte[] bytes = socket.recv();
+ System.out.println("Received bytes:[" + bytes.length + "]");
+ msg = (Message) Message.deserialize(bytes);
+ } catch (Throwable t) {
+ System.out.println("Caught exception");
+ t.printStackTrace();
+ }
+ return msg;
+ /*m.setType((Message.MessageType) Message.deserialize(socket.recv()));
+
+ if (socket.hasReceiveMore()) {
+ m.setSender((String) Message.deserialize(socket.recv()));
+ }
+ if (socket.hasReceiveMore()) {
+ m.setRoute((RouteIdentifier) Message.deserialize(socket.recv()));
+ }
+ if (socket.hasReceiveMore()) {
+ m.setPayload(Message.deserialize(socket.recv()));
+ }
+ return m;*/
+ }
+
+ @Override
+ public Future<RpcReply<Object>> sendRpc(final RpcRequest<QName, QName, InstanceIdentifier, Object> input) {
+
+ return handlersPool.submit(new Callable<RpcReply<Object>>() {
+
+ @Override
+ public RpcReply<Object> call() {
+ ZMQ.Socket requestSocket = context.socket(ZMQ.REQ);
+ Message req = new Message();
+ Message resp = null;
+ RpcReplyImpl reply = new RpcReplyImpl();
+ requestSocket.connect((String) routingTable.get(input.getRoutingInformation().getRoute()));
+
+ req.setType(Message.MessageType.REQUEST);
+ req.setSender(getLocalIpAddress() + ":" + rpcPort);
+ req.setRoute(input.getRoutingInformation());
+ req.setPayload(input.getPayload());
+ try {
+ requestSocket.send(Message.serialize(req));
+ resp = parseMessage(requestSocket);
+ reply.setPayload(resp.getPayload());
+ } catch (IOException ex) {//| ClassNotFoundException ex) {
+ //Log and ignore
+ System.out.println("Error in RPC send. Input could not be serialized[" + input + "]");
+ }
+
+ return reply;
+ }
+ });
+ }
+
+ public void publish(final Message message) {
+ Runnable task = new Runnable() {
+ public void run() {
+ // Bind to publishing port
+ publisher = context.socket(ZMQ.PUB);
+ publisher.bind("tcp://*:" + pubPort);
+ System.out.println("Publisher started at port[" + pubPort + "]");
+ try {
+ Message outMessage = new Message();
+ outMessage.setType(Message.MessageType.ANNOUNCE);
+ outMessage.setSender("tcp://" + getLocalIpAddress() + ":" + rpcPort);
+ outMessage.setRoute(message.getRoute());
+
+ System.out.println("Sending announcement[" + outMessage + "]");
+ publisher.sendMore(Message.serialize(Message.MessageType.ANNOUNCE));
+ publisher.send(Message.serialize(outMessage));
+
+ } catch (IOException ex) {
+ //Log and ignore
+ System.out.println("Error in publishing");
+ ex.printStackTrace();
+ }
+ System.out.println("Published message[" + message + "]");
+ publisher.close();
+ }
+ };
+ handlersPool.execute(task);
+ }
+
+ private String getLocalIpAddress() {
+ String hostAddress = null;
+ Enumeration e = null;
+ try {
+ e = NetworkInterface.getNetworkInterfaces();
+ } catch (SocketException e1) {
+ e1.printStackTrace();
+ }
+ while (e.hasMoreElements()) {
+
+ NetworkInterface n = (NetworkInterface) e.nextElement();
+ Enumeration ee = n.getInetAddresses();
+ while (ee.hasMoreElements()) {
+ InetAddress i = (InetAddress) ee.nextElement();
+ if ((i instanceof Inet4Address) && (i.isSiteLocalAddress()))
+ hostAddress = i.getHostAddress();
+ }
+ }
+
+ return hostAddress;
+
+
+ }
+
+
+ private class RpcFacade implements RpcImplementation {
+
+
+ @Override
+ public Set<QName> getSupportedRpcs() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+
+ RpcRequestImpl request = new RpcRequestImpl();
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl();
+ routeId.setContext(null);
+ routeId.setRoute(null);
+ routeId.setType(rpc);
+
+ request.setRouteIdentifier(routeId);
+ request.setPayload(input);
+ // Create message
+
+ Future<org.opendaylight.controller.sal.connector.api.RpcRouter.RpcReply<Object>> ret = sendRpc(request);
+
+ return null;
+ }
+ }
+
+ private class RpcListener implements RpcRegistrationListener {
+
+ @Override
+ public void onRpcImplementationAdded(QName name) {
+
+ Message msg = new Message();
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl();
+ routeId.setType(name);
+ msg.setRoute(routeId);
+ publish(msg);
+ }
+
+ @Override
+ public void onRpcImplementationRemoved(QName name) {
+ // TODO Auto-generated method stub
+
+ }
+ }
+
+ public void setBrokerSession(ProviderSession session) {
+ this.brokerSession = session;
+
+ }
+
+}
--- /dev/null
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>sal-parent</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <groupId>org.opendaylight.controller</groupId>
+ </parent>
+ <packaging>pom</packaging>
+ <groupId>org.opendaylight.controller.tests</groupId>
+ <artifactId>sal-test-parent</artifactId>
+ <scm>
+ <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
+ <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
+ <url>https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL</url>
+ </scm>
+
+ <modules>
+ <module>zeromq-test-consumer</module>
+ <module>zeromq-test-it</module>
+ <module>zeromq-test-provider</module>
+ </modules>
+
+</project>
--- /dev/null
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>sal-test-parent</artifactId>
+ <groupId>org.opendaylight.controller.tests</groupId>
+ <version>1.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>zeromq-test-consumer</artifactId>
+ <packaging>bundle</packaging>
+ <scm>
+ <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
+ <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
+ <url>https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL</url>
+ </scm>
+
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <configuration>
+ <instructions>
+ <Bundle-Activator>org.opendaylight.controller.sample.zeromq.consumer.ExampleConsumer</Bundle-Activator>
+ <Import-Package>
+ org.opendaylight.controller.sal.core.api,
+ org.opendaylight.yangtools.yang.common;version="[0.5,1)",
+ org.opendaylight.yangtools.yang.data.api,
+ </Import-Package>
+ </instructions>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-binding-api</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-common-util</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-core-api</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>containermanager</artifactId>
+ <version>0.5.1-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal</artifactId>
+ <version>0.5.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-binding</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-data-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-common-util</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+</project>
--- /dev/null
+package org.opendaylight.controller.sample.zeromq.consumer;
+
+import java.net.URI;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.opendaylight.controller.sal.core.api.AbstractConsumer;
+import org.opendaylight.controller.sal.core.api.Broker.ConsumerSession;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.osgi.framework.BundleContext;
+
+public class ExampleConsumer extends AbstractConsumer {
+
+ private final URI namespace = URI.create("http://cisco.com/example");
+ private final QName QNAME = new QName(namespace,"heartbeat");
+
+ ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+ private ConsumerSession session;
+
+
+ @Override
+ public void onSessionInitiated(ConsumerSession session) {
+ this.session = session;
+ executor.scheduleAtFixedRate(new Runnable() {
+
+ @Override
+ public void run() {
+ int count = 0;
+ try {
+ Future<RpcResult<CompositeNode>> future = ExampleConsumer.this.session.rpc(QNAME, null);
+ RpcResult<CompositeNode> result = future.get();
+ System.out.println("Result received. Status is :" + result.isSuccessful());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+ }, 0, 10, TimeUnit.SECONDS);
+ }
+
+ @Override
+ protected void stopImpl(BundleContext context) {
+ // TODO Auto-generated method stub
+ super.stopImpl(context);
+ executor.shutdown();
+ }
+}
--- /dev/null
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>sal-test-parent</artifactId>
+ <groupId>org.opendaylight.controller.tests</groupId>
+ <version>1.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>zeromq-test-it</artifactId>
+ <scm>
+ <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
+ <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
+ <url>https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL</url>
+ </scm>
+
+ <properties>
+ <exam.version>3.0.0</exam.version>
+ <url.version>1.5.0</url.version>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.ops4j.pax.exam</groupId>
+ <artifactId>maven-paxexam-plugin</artifactId>
+ <version>1.2.4</version>
+ <executions>
+ <execution>
+ <id>generate-config</id>
+ <goals>
+ <goal>generate-depends-file</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ <pluginManagement>
+ <plugins>
+ <!--This plugin's configuration is used to store Eclipse m2e settings
+ only. It has no influence on the Maven build itself. -->
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>
+ org.ops4j.pax.exam
+ </groupId>
+ <artifactId>
+ maven-paxexam-plugin
+ </artifactId>
+ <versionRange>
+ [1.2.4,)
+ </versionRange>
+ <goals>
+ <goal>
+ generate-depends-file
+ </goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore></ignore>
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.opendaylight.yangtools.thirdparty</groupId>
+ <artifactId>xtend-lib-osgi</artifactId>
+ <version>2.4.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller.tests</groupId>
+ <artifactId>zeromq-test-provider</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller.tests</groupId>
+ <artifactId>zeromq-test-consumer</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-broker-impl</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.ops4j.pax.exam</groupId>
+ <artifactId>pax-exam-container-native</artifactId>
+ <version>${exam.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.ops4j.pax.exam</groupId>
+ <artifactId>pax-exam-junit4</artifactId>
+ <version>${exam.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.ops4j.pax.exam</groupId>
+ <artifactId>pax-exam-link-mvn</artifactId>
+ <version>${exam.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>equinoxSDK381</groupId>
+ <artifactId>org.eclipse.osgi</artifactId>
+ <version>3.8.1.v20120830-144521</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <version>1.7.2</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ <version>1.0.9</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>1.0.9</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-binding-api</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-common-util</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-core-api</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>containermanager</artifactId>
+ <version>0.5.1-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal</artifactId>
+ <version>0.5.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-binding</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-data-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-common-util</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+</project>
--- /dev/null
+package org.opendaylight.controller.sample.zeromq.test.it;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.Configuration;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.junit.PaxExam;
+import org.osgi.framework.BundleContext;
+
+import javax.inject.Inject;
+
+import static org.junit.Assert.assertTrue;
+import static org.ops4j.pax.exam.CoreOptions.*;
+
+@RunWith(PaxExam.class)
+public class ServiceConsumerController {
+
+ public static final String ODL = "org.opendaylight.controller";
+ public static final String YANG = "org.opendaylight.yangtools";
+ public static final String SAMPLE = "org.opendaylight.controller.samples";
+
+ @Test
+ public void properInitialized() throws Exception {
+
+ Thread.sleep(30000); // Waiting for services to get wired.
+ assertTrue(true);
+ //assertTrue(consumer.createToast(WhiteBread.class, 5));
+
+ }
+
+// @Inject
+// BindingAwareBroker broker;
+
+// @Inject
+// ToastConsumer consumer;
+
+ @Inject
+ BundleContext ctx;
+
+ @Configuration
+ public Option[] config() {
+ return options(systemProperty("osgi.console").value("2401"),
+ systemProperty("pub.port").value("5557"),
+ systemProperty("sub.port").value("5556"),
+ systemProperty("rpc.port").value("5555"),
+ systemProperty("pub.ip").value("localhost"),
+ mavenBundle("org.slf4j", "slf4j-api").versionAsInProject(), //
+ mavenBundle("org.slf4j", "log4j-over-slf4j").versionAsInProject(), //
+ mavenBundle("ch.qos.logback", "logback-core").versionAsInProject(), //
+ mavenBundle("ch.qos.logback", "logback-classic").versionAsInProject(), //
+
+ //mavenBundle(ODL, "sal-binding-broker-impl").versionAsInProject().update(), //
+ mavenBundle(ODL, "sal-common").versionAsInProject(), //
+ mavenBundle(ODL, "sal-common-api").versionAsInProject(),//
+ mavenBundle(ODL, "sal-common-impl").versionAsInProject(), //
+ mavenBundle(ODL, "sal-common-util").versionAsInProject(), //
+ mavenBundle(ODL, "sal-core-api").versionAsInProject().update(), //
+ mavenBundle(ODL, "sal-broker-impl").versionAsInProject(), //
+ mavenBundle(ODL, "sal-core-spi").versionAsInProject().update(), //
+ mavenBundle(ODL, "sal-connector-api").versionAsInProject(), //
+ mavenBundle(SAMPLE, "zeromq-test-consumer").versionAsInProject(), //
+ mavenBundle(ODL, "sal-zeromq-connector").versionAsInProject(), //
+ mavenBundle(YANG, "concepts").versionAsInProject(),
+ mavenBundle(YANG, "yang-binding").versionAsInProject(), //
+ mavenBundle(YANG, "yang-common").versionAsInProject(), //
+ mavenBundle(YANG, "yang-data-api").versionAsInProject(), //
+ mavenBundle(YANG, "yang-model-api").versionAsInProject(), //
+ mavenBundle(YANG+".thirdparty", "xtend-lib-osgi").versionAsInProject(), //
+ mavenBundle("com.google.guava", "guava").versionAsInProject(), //
+ mavenBundle("org.jeromq", "jeromq").versionAsInProject(),
+ junitBundles()
+ );
+ }
+
+}
--- /dev/null
+package org.opendaylight.controller.sample.zeromq.test.it;
+
+import static org.junit.Assert.*;
+import static org.ops4j.pax.exam.CoreOptions.junitBundles;
+import static org.ops4j.pax.exam.CoreOptions.mavenBundle;
+import static org.ops4j.pax.exam.CoreOptions.options;
+import static org.ops4j.pax.exam.CoreOptions.systemPackages;
+import static org.ops4j.pax.exam.CoreOptions.systemProperty;
+import static org.ops4j.pax.exam.CoreOptions.maven;
+
+import java.util.Collection;
+
+import javax.inject.Inject;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.ops4j.pax.exam.Configuration;
+import org.ops4j.pax.exam.CoreOptions;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.junit.PaxExam;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceReference;
+
+@RunWith(PaxExam.class)
+public class ServiceProviderController {
+
+ public static final String ODL = "org.opendaylight.controller";
+ public static final String YANG = "org.opendaylight.yangtools";
+ public static final String SAMPLE = "org.opendaylight.controller.samples";
+
+ @Test
+ public void properInitialized() throws Exception {
+
+ Thread.sleep(30000); // Waiting for services to get wired.
+ assertTrue(true);
+ //assertTrue(consumer.createToast(WhiteBread.class, 5));
+
+ }
+
+// @Inject
+// BindingAwareBroker broker;
+
+// @Inject
+// ToastConsumer consumer;
+
+ @Inject
+ BundleContext ctx;
+
+ @Configuration
+ public Option[] config() {
+ return options(systemProperty("osgi.console").value("2401"),
+ systemProperty("pub.port").value("5556"),
+ systemProperty("sub.port").value("5557"),
+ systemProperty("rpc.port").value("5554"),
+ systemProperty("pub.ip").value("localhost"),
+ mavenBundle("org.slf4j", "slf4j-api").versionAsInProject(), //
+ mavenBundle("org.slf4j", "log4j-over-slf4j").versionAsInProject(), //
+ mavenBundle("ch.qos.logback", "logback-core").versionAsInProject(), //
+ mavenBundle("ch.qos.logback", "logback-classic").versionAsInProject(), //
+
+ //mavenBundle(ODL, "sal-binding-broker-impl").versionAsInProject().update(), //
+ mavenBundle(ODL, "sal-common").versionAsInProject(), //
+ mavenBundle(ODL, "sal-common-api").versionAsInProject(),//
+ mavenBundle(ODL, "sal-common-impl").versionAsInProject(), //
+ mavenBundle(ODL, "sal-common-util").versionAsInProject(), //
+ mavenBundle(ODL, "sal-core-api").versionAsInProject().update(), //
+ mavenBundle(ODL, "sal-broker-impl").versionAsInProject(), //
+ mavenBundle(ODL, "sal-core-spi").versionAsInProject().update(), //
+ mavenBundle(ODL, "sal-connector-api").versionAsInProject(), //
+ mavenBundle(SAMPLE, "zeromq-test-provider").versionAsInProject(), //
+ mavenBundle(ODL, "sal-zeromq-connector").versionAsInProject(), //
+ mavenBundle(YANG, "concepts").versionAsInProject(),
+ mavenBundle(YANG, "yang-binding").versionAsInProject(), //
+ mavenBundle(YANG, "yang-common").versionAsInProject(), //
+ mavenBundle(YANG, "yang-data-api").versionAsInProject(), //
+ mavenBundle(YANG, "yang-model-api").versionAsInProject(), //
+ mavenBundle(YANG+".thirdparty", "xtend-lib-osgi").versionAsInProject(), //
+ mavenBundle("com.google.guava", "guava").versionAsInProject(), //
+ mavenBundle("org.jeromq", "jeromq").versionAsInProject(),
+ junitBundles()
+ );
+ }
+
+}
--- /dev/null
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>sal-test-parent</artifactId>
+ <groupId>org.opendaylight.controller.tests</groupId>
+ <version>1.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>zeromq-test-provider</artifactId>
+ <packaging>bundle</packaging>
+ <scm>
+ <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
+ <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
+ <url>https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL</url>
+ </scm>
+
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <configuration>
+ <instructions>
+ <Bundle-Activator>org.opendaylight.controller.sample.zeromq.provider.ExampleProvider</Bundle-Activator>
+ </instructions>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-binding-api</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-common-util</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-core-api</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>containermanager</artifactId>
+ <version>0.5.1-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal</artifactId>
+ <version>0.5.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-binding</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-data-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-common-util</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-zeromq-connector</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+
+ </dependencies>
+</project>
--- /dev/null
+package org.opendaylight.controller.sample.zeromq.provider;
+
+import java.net.URI;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+
+import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.controller.sal.core.api.AbstractProvider;
+import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.osgi.framework.BundleContext;
+
+public class ExampleProvider extends AbstractProvider implements RpcImplementation {
+
+ private final URI namespace = URI.create("http://cisco.com/example");
+ private final QName QNAME = new QName(namespace,"heartbeat");
+ private RpcRegistration reg;
+
+
+ @Override
+ public void onSessionInitiated(ProviderSession session) {
+ //Adding heartbeat 10 times just to make sure subscriber get it
+ for (int i=0;i<10;i++){
+ System.out.println("ExampleProvider: Adding " + QNAME + " " + i);
+ reg = session.addRpcImplementation(QNAME, this);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ }
+
+ @Override
+ public Set<QName> getSupportedRpcs() {
+ return Collections.singleton(QNAME);
+ }
+
+ @Override
+ public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+ if(QNAME.equals(rpc)) {
+ RpcResult<CompositeNode> output = Rpcs.getRpcResult(true, null, Collections.<RpcError>emptySet());
+ return output;
+ }
+ RpcResult<CompositeNode> output = Rpcs.getRpcResult(false, null, Collections.<RpcError>emptySet());
+ return output;
+ }
+
+ @Override
+ protected void stopImpl(BundleContext context) {
+ if(reg != null) {
+ try {
+ reg.close();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ }
+
+}