From 12762ff78528a8e6aa017f78121f13c21a7aea7f Mon Sep 17 00:00:00 2001 From: Tony Tkacik Date: Mon, 28 Oct 2013 19:30:40 +0100 Subject: [PATCH] Updated ZeroMQ implementation ZeroMQ implementation listens for rpc service implementations on one controller node and when new implementation is registered on controller it announces it via message bus. The receivers of announcement registers itself as the implementation of the same service in that node, and delegates actual processing via message bus to originating controller instance. This allows for deployment where one controller has a bundle which provides service A, and other which has bundle which only consumes service A to cooperate. Change-Id: If5a2220e05858693db6297a7592893a09021e97d Signed-off-by: Tony Tkacik --- opendaylight/md-sal/pom.xml | 22 +- .../sal/connector/api/RpcRouter.java | 12 +- .../sal/dom/broker/ProviderContextImpl.xtend | 73 ++-- .../md-sal/sal-zeromq-connector/pom.xml | 96 +++++ .../sal/connector/zeromq/Activator.java | 23 ++ .../sal/connector/zeromq/Message.java | 150 ++++++++ .../connector/zeromq/RouteIdentifierImpl.java | 53 +++ .../sal/connector/zeromq/RpcReplyImpl.java | 26 ++ .../sal/connector/zeromq/RpcRequestImpl.java | 39 ++ .../sal/connector/zeromq/ZeroMqRpcRouter.java | 337 ++++++++++++++++++ opendaylight/md-sal/test/pom.xml | 24 ++ .../md-sal/test/zeromq-test-consumer/pom.xml | 85 +++++ .../zeromq/consumer/ExampleConsumer.java | 51 +++ .../md-sal/test/zeromq-test-it/pom.xml | 184 ++++++++++ .../test/it/ServiceConsumerController.java | 75 ++++ .../test/it/ServiceProviderController.java | 86 +++++ .../md-sal/test/zeromq-test-provider/pom.xml | 86 +++++ .../zeromq/provider/ExampleProvider.java | 67 ++++ 18 files changed, 1444 insertions(+), 45 deletions(-) create mode 100644 opendaylight/md-sal/sal-zeromq-connector/pom.xml create mode 100644 opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/Activator.java create mode 100644 opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/Message.java create mode 100644 opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/RouteIdentifierImpl.java create mode 100644 opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/RpcReplyImpl.java create mode 100644 opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/RpcRequestImpl.java create mode 100644 opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/ZeroMqRpcRouter.java create mode 100644 opendaylight/md-sal/test/pom.xml create mode 100644 opendaylight/md-sal/test/zeromq-test-consumer/pom.xml create mode 100644 opendaylight/md-sal/test/zeromq-test-consumer/src/main/java/org/opendaylight/controller/sample/zeromq/consumer/ExampleConsumer.java create mode 100644 opendaylight/md-sal/test/zeromq-test-it/pom.xml create mode 100644 opendaylight/md-sal/test/zeromq-test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/ServiceConsumerController.java create mode 100644 opendaylight/md-sal/test/zeromq-test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/ServiceProviderController.java create mode 100644 opendaylight/md-sal/test/zeromq-test-provider/pom.xml create mode 100644 opendaylight/md-sal/test/zeromq-test-provider/src/main/java/org/opendaylight/controller/sample/zeromq/provider/ExampleProvider.java diff --git a/opendaylight/md-sal/pom.xml b/opendaylight/md-sal/pom.xml index a6740fabac..7f5f10e21b 100644 --- a/opendaylight/md-sal/pom.xml +++ b/opendaylight/md-sal/pom.xml @@ -47,6 +47,8 @@ clustered-data-store/implementation clustered-data-store/integrationtest + sal-zeromq-connector + test @@ -319,8 +321,7 @@ jacoco-maven-plugin ${jacoco.version} - + org.eclipse.m2e lifecycle-mapping @@ -349,6 +350,23 @@ + + + org.jacoco + + jacoco-maven-plugin + + + [0.5.3.201107060350,) + + + prepare-agent + + + + + + diff --git a/opendaylight/md-sal/sal-connector-api/src/main/java/org/opendaylight/controller/sal/connector/api/RpcRouter.java b/opendaylight/md-sal/sal-connector-api/src/main/java/org/opendaylight/controller/sal/connector/api/RpcRouter.java index 4807c4e200..08fce5783e 100644 --- a/opendaylight/md-sal/sal-connector-api/src/main/java/org/opendaylight/controller/sal/connector/api/RpcRouter.java +++ b/opendaylight/md-sal/sal-connector-api/src/main/java/org/opendaylight/controller/sal/connector/api/RpcRouter.java @@ -11,11 +11,11 @@ import java.util.concurrent.Future; * @param Rpc Type * @param Data Type */ -public interface RpcRouter { +public interface RpcRouter { - Future> sendRpc(RpcRequest input); + Future> sendRpc(RpcRequest input); /** @@ -27,17 +27,17 @@ public interface RpcRouter { * @param Rpc Type * @param Data Type */ - public interface RpcRequest { + public interface RpcRequest { - RouteIdentifier getRoutingInformation(); + RouteIdentifier getRoutingInformation(); D getPayload(); } - public interface RouteIdentifier { + public interface RouteIdentifier { C getContext(); // defines a routing table (e.g. NodeContext) - R getRoute(); // e.g. (node identity) T getType(); // rpc type + R getRoute(); // e.g. (node identity) } public interface RpcReply { diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/ProviderContextImpl.xtend b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/ProviderContextImpl.xtend index 3fdd706577..bffc570596 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/ProviderContextImpl.xtend +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/ProviderContextImpl.xtend @@ -27,75 +27,74 @@ class ProviderContextImpl extends ConsumerContextImpl implements ProviderSession } override addRpcImplementation(QName rpcType, RpcImplementation implementation) throws IllegalArgumentException { - if(rpcType == null) { + if (rpcType == null) { throw new IllegalArgumentException("rpcType must not be null"); } - if(implementation == null) { + if (implementation == null) { throw new IllegalArgumentException("Implementation must not be null"); } broker.addRpcImplementation(rpcType, implementation); rpcImpls.put(rpcType, implementation); - + return new RpcRegistrationImpl(rpcType, implementation, this); } def removeRpcImplementation(RpcRegistrationImpl implToRemove) throws IllegalArgumentException { val localImpl = rpcImpls.get(implToRemove.type); - if(localImpl !== implToRemove.instance) { - throw new IllegalStateException( - "Implementation was not registered in this session"); + if (localImpl !== implToRemove.instance) { + throw new IllegalStateException("Implementation was not registered in this session"); } - broker.removeRpcImplementation(implToRemove.type,localImpl); + broker.removeRpcImplementation(implToRemove.type, localImpl); rpcImpls.remove(implToRemove.type); } - + override close() { - removeAllRpcImlementations - super.close + removeAllRpcImlementations + super.close } - + private def removeAllRpcImlementations() { - if (!rpcImpls.empty) { - for (entry : rpcImpls.entrySet) { - broker.removeRpcImplementation(entry.key,entry.value); - } - rpcImpls.clear - } + if (!rpcImpls.empty) { + for (entry : rpcImpls.entrySet) { + broker.removeRpcImplementation(entry.key, entry.value); + } + rpcImpls.clear + } } - + override addMountedRpcImplementation(QName rpcType, RpcImplementation implementation) { throw new UnsupportedOperationException("TODO: auto-generated method stub") } - + override addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) { throw new UnsupportedOperationException("TODO: auto-generated method stub") } - + override getSupportedRpcs() { broker.getSupportedRpcs(); } - + override addRpcRegistrationListener(RpcRegistrationListener listener) { broker.addRpcRegistrationListener(listener); } } class RpcRegistrationImpl extends AbstractObjectRegistration implements RpcRegistration { - - @Property - val QName type - - private var ProviderContextImpl context - - new(QName type, RpcImplementation instance, ProviderContextImpl ctx) { - super(instance) - _type = type - context = ctx - } - - override protected removeRegistration() { - context.removeRpcImplementation(this) - context = null - } + + @Property + val QName type + + private var ProviderContextImpl context + + new(QName type, RpcImplementation instance, ProviderContextImpl ctx) { + super(instance) + _type = type + context = ctx + } + + override protected removeRegistration() { + context.removeRpcImplementation(this) + context = null + } } diff --git a/opendaylight/md-sal/sal-zeromq-connector/pom.xml b/opendaylight/md-sal/sal-zeromq-connector/pom.xml new file mode 100644 index 0000000000..3db4a65840 --- /dev/null +++ b/opendaylight/md-sal/sal-zeromq-connector/pom.xml @@ -0,0 +1,96 @@ + + + 4.0.0 + + org.opendaylight.controller + sal-parent + 1.0-SNAPSHOT + + + + sal-zeromq-connector + bundle + + + + + org.apache.felix + maven-bundle-plugin + ${bundle.plugin.version} + true + + + + org.opendaylight.controller.sal.connector.api, + org.opendaylight.controller.sal.core.api, + org.opendaylight.yangtools.concepts;version="[0.1,1)", + org.opendaylight.yangtools.yang.common;version="[0.5,1)", + org.opendaylight.yangtools.yang.data.api;version="[0.5,1)", + org.zeromq;version="[0.3,1)" + + org.opendaylight.controller.sal.connector.zeromq.Activator + + + + + + + + org.opendaylight.controller + containermanager + 0.5.1-SNAPSHOT + + + org.opendaylight.controller + commons.northbound + 0.4.1-SNAPSHOT + + + org.opendaylight.controller + sal + 0.5.1-SNAPSHOT + + + org.opendaylight.yangtools + yang-binding + + + org.opendaylight.yangtools + yang-common + + + org.opendaylight.controller + sal-connector-api + + + org.opendaylight.controller + sal-common-util + 1.0-SNAPSHOT + + + + junit + junit + + + org.jeromq + jeromq + 0.3.0-SNAPSHOT + + + + + + sonatype-nexus-snapshots + https://oss.sonatype.org/content/repositories/snapshots + + false + + + true + + + + + diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/Activator.java b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/Activator.java new file mode 100644 index 0000000000..b8933ec615 --- /dev/null +++ b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/Activator.java @@ -0,0 +1,23 @@ +package org.opendaylight.controller.sal.connector.zeromq; + +import org.opendaylight.controller.sal.core.api.AbstractProvider; +import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; +import org.osgi.framework.BundleContext; + +public class Activator extends AbstractProvider { + + ZeroMqRpcRouter router; + + @Override + public void onSessionInitiated(ProviderSession session) { + router = ZeroMqRpcRouter.getInstance(); + router.setBrokerSession(session); + router.start(); + } + + @Override + protected void stopImpl(BundleContext context) { + router.stop(); + } + +} diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/Message.java b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/Message.java new file mode 100644 index 0000000000..dd87646ed6 --- /dev/null +++ b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/Message.java @@ -0,0 +1,150 @@ +package org.opendaylight.controller.sal.connector.zeromq; + + +import org.codehaus.jackson.map.ObjectMapper; +import org.opendaylight.controller.sal.connector.api.RpcRouter; + +import java.io.*; +import java.util.Arrays; + +public class Message implements Serializable { + + public enum MessageType { + ANNOUNCE((byte) 0), + HEARTBEAT((byte) 1), + REQUEST((byte) 2), + RESPONSE((byte) 3); + + private final byte type; + + MessageType(byte type) { + this.type = type; + } + + public byte getType(){ + return this.type; + } + } + + private MessageType type; + private String sender; + private RpcRouter.RouteIdentifier route; + private Object payload; + + public MessageType getType() { + return type; + } + + public void setType(MessageType type) { + this.type = type; + } + + public String getSender() { + return sender; + } + + public void setSender(String sender) { + this.sender = sender; + } + + public RpcRouter.RouteIdentifier getRoute() { + return route; + } + + public void setRoute(RpcRouter.RouteIdentifier route) { + this.route = route; + } + + public Object getPayload() { + return payload; + } + + public void setPayload(Object payload) { + this.payload = payload; + } + + @Override + public String toString() { + return "Message{" + + "type=" + type + + ", sender='" + sender + '\'' + + ", route=" + route + + ", payload=" + payload + + '}'; + } + + /** + * Converts any {@link Serializable} object to byte[] + * + * @param obj + * @return + * @throws IOException + */ + public static byte[] serialize(Object obj) throws IOException { + ByteArrayOutputStream b = new ByteArrayOutputStream(); + ObjectOutputStream o = new ObjectOutputStream(b); + o.writeObject(obj); + return b.toByteArray(); + } + + /** + * Converts byte[] to a java object + * + * @param bytes + * @return + * @throws IOException + * @throws ClassNotFoundException + */ + public static Object deserialize(byte[] bytes) throws IOException, ClassNotFoundException { + ByteArrayInputStream b = new ByteArrayInputStream(bytes); + ObjectInputStream o = new ObjectInputStream(b); + return o.readObject(); + } + + public static byte[] toJsonBytes(Message m){ + ObjectMapper o = new ObjectMapper(); + try { + System.out.println(o.writeValueAsString(m)); + return o.writeValueAsBytes(m); + } catch (IOException e) { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + return null; + } + + public static Message fromJsonBytes(byte [] bytes){ + + ObjectMapper o = new ObjectMapper(); + Message m = null; + try { + m = o.readValue(bytes, Message.class); + } catch (IOException e) { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + return m; + } + + public static class Response extends Message implements RpcRouter.RpcReply { + private ResponseCode code; // response code + + public static enum ResponseCode { + SUCCESS(200), BADREQUEST(400), TIMEOUT(408), GONE(410), SERVERERROR(500), SERVICEUNAVAILABLE(503); + + private int code; + + ResponseCode(int code) { + this.code = code; + } + } + + public ResponseCode getCode() { + return code; + } + + public void setCode(ResponseCode code) { + this.code = code; + } + } + +} + diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/RouteIdentifierImpl.java b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/RouteIdentifierImpl.java new file mode 100644 index 0000000000..8eab01b1fb --- /dev/null +++ b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/RouteIdentifierImpl.java @@ -0,0 +1,53 @@ +package org.opendaylight.controller.sal.connector.zeromq; + +import org.opendaylight.controller.sal.connector.api.RpcRouter; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; + +import java.io.Serializable; + +/** + * User: abhishk2 + */ +public class RouteIdentifierImpl implements RpcRouter.RouteIdentifier,Serializable { + + private QName context; + private QName type; + private InstanceIdentifier route; + + @Override + public QName getContext() { + return this.context; + } + + @Override + public QName getType() { + return this.type; + } + + @Override + public InstanceIdentifier getRoute() { + return this.route; + } + + public void setContext(QName context) { + this.context = context; + } + + public void setType(QName type) { + this.type = type; + } + + public void setRoute(InstanceIdentifier route) { + this.route = route; + } + + @Override + public String toString() { + return "RouteIdentifierImpl{" + + "context=" + context + + ", type=" + type + + ", route=" + route + + '}'; + } +} diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/RpcReplyImpl.java b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/RpcReplyImpl.java new file mode 100644 index 0000000000..66ff7148a5 --- /dev/null +++ b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/RpcReplyImpl.java @@ -0,0 +1,26 @@ +package org.opendaylight.controller.sal.connector.zeromq; + +import org.opendaylight.controller.sal.connector.api.RpcRouter; + +import java.io.Serializable; + +/** + * Created with IntelliJ IDEA. + * User: abhishk2 + * Date: 10/24/13 + * Time: 4:25 PM + * To change this template use File | Settings | File Templates. + */ +public class RpcReplyImpl implements RpcRouter.RpcReply,Serializable { + + private Object payload; + + @Override + public Object getPayload() { + return payload; //To change body of implemented methods use File | Settings | File Templates. + } + + public void setPayload(Object payload){ + this.payload = payload; + } +} diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/RpcRequestImpl.java b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/RpcRequestImpl.java new file mode 100644 index 0000000000..2361ab76c3 --- /dev/null +++ b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/RpcRequestImpl.java @@ -0,0 +1,39 @@ +package org.opendaylight.controller.sal.connector.zeromq; + +import org.opendaylight.controller.sal.connector.api.RpcRouter; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; + +import java.io.Serializable; + +/** + * Created with IntelliJ IDEA. + * User: abhishk2 + * Date: 10/25/13 + * Time: 12:32 PM + * To change this template use File | Settings | File Templates. + */ +public class RpcRequestImpl implements RpcRouter.RpcRequest,Serializable { + + private RpcRouter.RouteIdentifier routeIdentifier; + private Object payload; + + @Override + public RpcRouter.RouteIdentifier getRoutingInformation() { + return routeIdentifier; + } + + public void setRouteIdentifier(RpcRouter.RouteIdentifier routeIdentifier) { + this.routeIdentifier = routeIdentifier; + } + + @Override + public Object getPayload() { + return payload; + } + + public void setPayload(Object payload) { + this.payload = payload; + } + +} diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/ZeroMqRpcRouter.java b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/ZeroMqRpcRouter.java new file mode 100644 index 0000000000..7e5efda362 --- /dev/null +++ b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/ZeroMqRpcRouter.java @@ -0,0 +1,337 @@ +package org.opendaylight.controller.sal.connector.zeromq; + +import org.opendaylight.controller.sal.connector.api.RpcRouter; +import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; +import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration; +import org.opendaylight.controller.sal.core.api.RpcImplementation; +import org.opendaylight.controller.sal.core.api.RpcRegistrationListener; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; +import org.zeromq.ZMQ; + +import java.io.IOException; +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.*; +import java.util.concurrent.*; + +public class ZeroMqRpcRouter implements RpcRouter { + + private ExecutorService serverPool; + private static ExecutorService handlersPool; + + private Map, String> routingTable; + + private ProviderSession brokerSession; + + private ZMQ.Context context; + private ZMQ.Socket publisher; + private ZMQ.Socket subscriber; + private ZMQ.Socket replySocket; + + private static ZeroMqRpcRouter _instance = new ZeroMqRpcRouter(); + + private final RpcFacade facade = new RpcFacade(); + private final RpcListener listener = new RpcListener(); + + private String pubPort = System.getProperty("pub.port");//port on which announcements are sent + private String subPort = System.getProperty("sub.port");//other controller's pub port + private String pubIp = System.getProperty("pub.ip"); //other controller's ip + private String rpcPort = System.getProperty("rpc.port");//port on which RPC messages are received + + + private ZeroMqRpcRouter() { + } + + public static ZeroMqRpcRouter getInstance() { + return _instance; + } + + public void start() { + context = ZMQ.context(2); + serverPool = Executors.newSingleThreadExecutor(); + handlersPool = Executors.newCachedThreadPool(); + routingTable = new ConcurrentHashMap, String>(); + + // Start listening for announce and rpc messages + serverPool.execute(receive()); + + + brokerSession.addRpcRegistrationListener(listener); + Set currentlySupported = brokerSession.getSupportedRpcs(); + for(QName rpc : currentlySupported) { + listener.onRpcImplementationAdded(rpc); + } + + + } + + public void stop() { + if (handlersPool != null) handlersPool.shutdown(); + if (serverPool != null) serverPool.shutdown(); + if (publisher != null) { + publisher.setLinger(0); + publisher.close(); + } + if (replySocket != null) { + replySocket.setLinger(0); + replySocket.close(); + } + if (subscriber != null) { + subscriber.setLinger(0); + subscriber.close(); + } + if (context != null) context.term(); + + + } + + private Runnable receive() { + return new Runnable() { + public void run() { + try { + // Bind to RPC reply socket + replySocket = context.socket(ZMQ.REP); + replySocket.bind("tcp://*:" + rpcPort); + + // Bind to publishing controller + subscriber = context.socket(ZMQ.SUB); + subscriber.connect("tcp://" + pubIp + ":" + subPort); + System.out.println("Subscribing at[" + "tcp://" + pubIp + ":" + subPort + "]"); + + subscriber.subscribe(Message.serialize(Message.MessageType.ANNOUNCE)); + + // Initialize poll set + ZMQ.Poller poller = new ZMQ.Poller(2); + poller.register(replySocket, ZMQ.Poller.POLLIN); + poller.register(subscriber, ZMQ.Poller.POLLIN); + + while (!Thread.currentThread().isInterrupted()) { + + poller.poll(250); + //TODO: Fix this + if (poller.pollin(0)) { + //receive rpc request and reply + try { + Message req = parseMessage(replySocket); + Message resp = new Message(); + //Call broker to process the message then reply + Future> rpc = brokerSession.rpc((QName) req.getRoute().getType(), (CompositeNode) req.getPayload()); + RpcResult result = rpc.get(); + resp.setType(Message.MessageType.RESPONSE); + resp.setSender(getLocalIpAddress() + ":" + rpcPort); + resp.setRoute(req.getRoute()); + resp.setPayload(result.isSuccessful()); + replySocket.send(Message.serialize(resp)); + + } catch (IOException ex) {// | ClassNotFoundException ex) { + System.out.println("Rpc request could not be handled" + ex); + } + } + if (poller.pollin(1)) { + //get subscription and update routing table + //try { + Message.MessageType topic = (Message.MessageType)Message.deserialize(subscriber.recv()); + System.out.println("Topic:[" + topic + "]"); + + if (subscriber.hasReceiveMore()) { + try { + Message m = (Message) Message.deserialize(subscriber.recv()); + System.out.println(m); + //TODO: check on msg type or topic. Both should be same. Need to normalize. + if (Message.MessageType.ANNOUNCE == m.getType()) updateRoutingTable(m); + } catch (IOException | ClassNotFoundException e) { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } +// + } + } + } catch (Exception e) { + e.printStackTrace(); + } + replySocket.setLinger(0); + replySocket.close(); + subscriber.setLinger(0); + subscriber.close(); + } + }; + } + + private void updateRoutingTable(Message msg) { + routingTable.put(msg.getRoute(), msg.getSender()); + RpcRouter.RouteIdentifier route = msg.getRoute(); + QName rpcType = route.getType(); + System.out.println("Routing Table\n" + routingTable); + + RpcRegistration registration = brokerSession.addRpcImplementation(rpcType, facade); + } + + private Message parseMessage(ZMQ.Socket socket) { + //Message m = new Message(); + //socket.setReceiveBufferSize(40000); + Message msg = null; + try { + byte[] bytes = socket.recv(); + System.out.println("Received bytes:[" + bytes.length + "]"); + msg = (Message) Message.deserialize(bytes); + } catch (Throwable t) { + System.out.println("Caught exception"); + t.printStackTrace(); + } + return msg; + /*m.setType((Message.MessageType) Message.deserialize(socket.recv())); + + if (socket.hasReceiveMore()) { + m.setSender((String) Message.deserialize(socket.recv())); + } + if (socket.hasReceiveMore()) { + m.setRoute((RouteIdentifier) Message.deserialize(socket.recv())); + } + if (socket.hasReceiveMore()) { + m.setPayload(Message.deserialize(socket.recv())); + } + return m;*/ + } + + @Override + public Future> sendRpc(final RpcRequest input) { + + return handlersPool.submit(new Callable>() { + + @Override + public RpcReply call() { + ZMQ.Socket requestSocket = context.socket(ZMQ.REQ); + Message req = new Message(); + Message resp = null; + RpcReplyImpl reply = new RpcReplyImpl(); + requestSocket.connect((String) routingTable.get(input.getRoutingInformation().getRoute())); + + req.setType(Message.MessageType.REQUEST); + req.setSender(getLocalIpAddress() + ":" + rpcPort); + req.setRoute(input.getRoutingInformation()); + req.setPayload(input.getPayload()); + try { + requestSocket.send(Message.serialize(req)); + resp = parseMessage(requestSocket); + reply.setPayload(resp.getPayload()); + } catch (IOException ex) {//| ClassNotFoundException ex) { + //Log and ignore + System.out.println("Error in RPC send. Input could not be serialized[" + input + "]"); + } + + return reply; + } + }); + } + + public void publish(final Message message) { + Runnable task = new Runnable() { + public void run() { + // Bind to publishing port + publisher = context.socket(ZMQ.PUB); + publisher.bind("tcp://*:" + pubPort); + System.out.println("Publisher started at port[" + pubPort + "]"); + try { + Message outMessage = new Message(); + outMessage.setType(Message.MessageType.ANNOUNCE); + outMessage.setSender("tcp://" + getLocalIpAddress() + ":" + rpcPort); + outMessage.setRoute(message.getRoute()); + + System.out.println("Sending announcement[" + outMessage + "]"); + publisher.sendMore(Message.serialize(Message.MessageType.ANNOUNCE)); + publisher.send(Message.serialize(outMessage)); + + } catch (IOException ex) { + //Log and ignore + System.out.println("Error in publishing"); + ex.printStackTrace(); + } + System.out.println("Published message[" + message + "]"); + publisher.close(); + } + }; + handlersPool.execute(task); + } + + private String getLocalIpAddress() { + String hostAddress = null; + Enumeration e = null; + try { + e = NetworkInterface.getNetworkInterfaces(); + } catch (SocketException e1) { + e1.printStackTrace(); + } + while (e.hasMoreElements()) { + + NetworkInterface n = (NetworkInterface) e.nextElement(); + Enumeration ee = n.getInetAddresses(); + while (ee.hasMoreElements()) { + InetAddress i = (InetAddress) ee.nextElement(); + if ((i instanceof Inet4Address) && (i.isSiteLocalAddress())) + hostAddress = i.getHostAddress(); + } + } + + return hostAddress; + + + } + + + private class RpcFacade implements RpcImplementation { + + + @Override + public Set getSupportedRpcs() { + return Collections.emptySet(); + } + + @Override + public RpcResult invokeRpc(QName rpc, CompositeNode input) { + + RpcRequestImpl request = new RpcRequestImpl(); + RouteIdentifierImpl routeId = new RouteIdentifierImpl(); + routeId.setContext(null); + routeId.setRoute(null); + routeId.setType(rpc); + + request.setRouteIdentifier(routeId); + request.setPayload(input); + // Create message + + Future> ret = sendRpc(request); + + return null; + } + } + + private class RpcListener implements RpcRegistrationListener { + + @Override + public void onRpcImplementationAdded(QName name) { + + Message msg = new Message(); + RouteIdentifierImpl routeId = new RouteIdentifierImpl(); + routeId.setType(name); + msg.setRoute(routeId); + publish(msg); + } + + @Override + public void onRpcImplementationRemoved(QName name) { + // TODO Auto-generated method stub + + } + } + + public void setBrokerSession(ProviderSession session) { + this.brokerSession = session; + + } + +} diff --git a/opendaylight/md-sal/test/pom.xml b/opendaylight/md-sal/test/pom.xml new file mode 100644 index 0000000000..f9e500ea2b --- /dev/null +++ b/opendaylight/md-sal/test/pom.xml @@ -0,0 +1,24 @@ + + 4.0.0 + + sal-parent + 1.0-SNAPSHOT + org.opendaylight.controller + + pom + org.opendaylight.controller.tests + sal-test-parent + + scm:git:ssh://git.opendaylight.org:29418/controller.git + scm:git:ssh://git.opendaylight.org:29418/controller.git + https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL + + + + zeromq-test-consumer + zeromq-test-it + zeromq-test-provider + + + diff --git a/opendaylight/md-sal/test/zeromq-test-consumer/pom.xml b/opendaylight/md-sal/test/zeromq-test-consumer/pom.xml new file mode 100644 index 0000000000..7c6bc21b46 --- /dev/null +++ b/opendaylight/md-sal/test/zeromq-test-consumer/pom.xml @@ -0,0 +1,85 @@ + + 4.0.0 + + sal-test-parent + org.opendaylight.controller.tests + 1.0-SNAPSHOT + + zeromq-test-consumer + bundle + + scm:git:ssh://git.opendaylight.org:29418/controller.git + scm:git:ssh://git.opendaylight.org:29418/controller.git + https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL + + + + + + + org.apache.felix + maven-bundle-plugin + + + org.opendaylight.controller.sample.zeromq.consumer.ExampleConsumer + + org.opendaylight.controller.sal.core.api, + org.opendaylight.yangtools.yang.common;version="[0.5,1)", + org.opendaylight.yangtools.yang.data.api, + + + + + + + + + + org.opendaylight.controller + sal-binding-api + 1.0-SNAPSHOT + + + org.opendaylight.controller + sal-common-util + 1.0-SNAPSHOT + + + org.opendaylight.controller + sal-core-api + 1.0-SNAPSHOT + + + + + org.opendaylight.controller + containermanager + 0.5.1-SNAPSHOT + + + + org.opendaylight.controller + sal + 0.5.1-SNAPSHOT + + + org.opendaylight.yangtools + yang-binding + + + org.opendaylight.yangtools + yang-common + + + org.opendaylight.yangtools + yang-data-api + + + + org.opendaylight.controller + sal-common-util + 1.0-SNAPSHOT + + + diff --git a/opendaylight/md-sal/test/zeromq-test-consumer/src/main/java/org/opendaylight/controller/sample/zeromq/consumer/ExampleConsumer.java b/opendaylight/md-sal/test/zeromq-test-consumer/src/main/java/org/opendaylight/controller/sample/zeromq/consumer/ExampleConsumer.java new file mode 100644 index 0000000000..a56a7dedff --- /dev/null +++ b/opendaylight/md-sal/test/zeromq-test-consumer/src/main/java/org/opendaylight/controller/sample/zeromq/consumer/ExampleConsumer.java @@ -0,0 +1,51 @@ +package org.opendaylight.controller.sample.zeromq.consumer; + +import java.net.URI; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.opendaylight.controller.sal.core.api.AbstractConsumer; +import org.opendaylight.controller.sal.core.api.Broker.ConsumerSession; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.osgi.framework.BundleContext; + +public class ExampleConsumer extends AbstractConsumer { + + private final URI namespace = URI.create("http://cisco.com/example"); + private final QName QNAME = new QName(namespace,"heartbeat"); + + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + private ConsumerSession session; + + + @Override + public void onSessionInitiated(ConsumerSession session) { + this.session = session; + executor.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + int count = 0; + try { + Future> future = ExampleConsumer.this.session.rpc(QNAME, null); + RpcResult result = future.get(); + System.out.println("Result received. Status is :" + result.isSuccessful()); + } catch (Exception e) { + e.printStackTrace(); + } + + } + }, 0, 10, TimeUnit.SECONDS); + } + + @Override + protected void stopImpl(BundleContext context) { + // TODO Auto-generated method stub + super.stopImpl(context); + executor.shutdown(); + } +} diff --git a/opendaylight/md-sal/test/zeromq-test-it/pom.xml b/opendaylight/md-sal/test/zeromq-test-it/pom.xml new file mode 100644 index 0000000000..56945d1d34 --- /dev/null +++ b/opendaylight/md-sal/test/zeromq-test-it/pom.xml @@ -0,0 +1,184 @@ + + 4.0.0 + + sal-test-parent + org.opendaylight.controller.tests + 1.0-SNAPSHOT + + zeromq-test-it + + scm:git:ssh://git.opendaylight.org:29418/controller.git + scm:git:ssh://git.opendaylight.org:29418/controller.git + https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL + + + + 3.0.0 + 1.5.0 + + + + + + org.ops4j.pax.exam + maven-paxexam-plugin + 1.2.4 + + + generate-config + + generate-depends-file + + + + + + + + + + org.eclipse.m2e + lifecycle-mapping + 1.0.0 + + + + + + + org.ops4j.pax.exam + + + maven-paxexam-plugin + + + [1.2.4,) + + + + generate-depends-file + + + + + + + + + + + + + + + + + + org.opendaylight.yangtools.thirdparty + xtend-lib-osgi + 2.4.3 + + + org.opendaylight.controller.tests + zeromq-test-provider + 1.0-SNAPSHOT + + + org.opendaylight.controller.tests + zeromq-test-consumer + 1.0-SNAPSHOT + + + org.opendaylight.controller + sal-broker-impl + 1.0-SNAPSHOT + + + org.ops4j.pax.exam + pax-exam-container-native + ${exam.version} + test + + + org.ops4j.pax.exam + pax-exam-junit4 + ${exam.version} + test + + + org.ops4j.pax.exam + pax-exam-link-mvn + ${exam.version} + test + + + equinoxSDK381 + org.eclipse.osgi + 3.8.1.v20120830-144521 + test + + + org.slf4j + log4j-over-slf4j + 1.7.2 + + + ch.qos.logback + logback-core + 1.0.9 + + + ch.qos.logback + logback-classic + 1.0.9 + + + org.opendaylight.controller + sal-binding-api + 1.0-SNAPSHOT + + + org.opendaylight.controller + sal-common-util + 1.0-SNAPSHOT + + + org.opendaylight.controller + sal-core-api + 1.0-SNAPSHOT + + + + + org.opendaylight.controller + containermanager + 0.5.1-SNAPSHOT + + + + org.opendaylight.controller + sal + 0.5.1-SNAPSHOT + + + org.opendaylight.yangtools + yang-binding + + + org.opendaylight.yangtools + yang-common + + + org.opendaylight.yangtools + yang-data-api + + + + org.opendaylight.controller + sal-common-util + 1.0-SNAPSHOT + + + diff --git a/opendaylight/md-sal/test/zeromq-test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/ServiceConsumerController.java b/opendaylight/md-sal/test/zeromq-test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/ServiceConsumerController.java new file mode 100644 index 0000000000..c17b143d70 --- /dev/null +++ b/opendaylight/md-sal/test/zeromq-test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/ServiceConsumerController.java @@ -0,0 +1,75 @@ +package org.opendaylight.controller.sample.zeromq.test.it; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.ops4j.pax.exam.Configuration; +import org.ops4j.pax.exam.Option; +import org.ops4j.pax.exam.junit.PaxExam; +import org.osgi.framework.BundleContext; + +import javax.inject.Inject; + +import static org.junit.Assert.assertTrue; +import static org.ops4j.pax.exam.CoreOptions.*; + +@RunWith(PaxExam.class) +public class ServiceConsumerController { + + public static final String ODL = "org.opendaylight.controller"; + public static final String YANG = "org.opendaylight.yangtools"; + public static final String SAMPLE = "org.opendaylight.controller.samples"; + + @Test + public void properInitialized() throws Exception { + + Thread.sleep(30000); // Waiting for services to get wired. + assertTrue(true); + //assertTrue(consumer.createToast(WhiteBread.class, 5)); + + } + +// @Inject +// BindingAwareBroker broker; + +// @Inject +// ToastConsumer consumer; + + @Inject + BundleContext ctx; + + @Configuration + public Option[] config() { + return options(systemProperty("osgi.console").value("2401"), + systemProperty("pub.port").value("5557"), + systemProperty("sub.port").value("5556"), + systemProperty("rpc.port").value("5555"), + systemProperty("pub.ip").value("localhost"), + mavenBundle("org.slf4j", "slf4j-api").versionAsInProject(), // + mavenBundle("org.slf4j", "log4j-over-slf4j").versionAsInProject(), // + mavenBundle("ch.qos.logback", "logback-core").versionAsInProject(), // + mavenBundle("ch.qos.logback", "logback-classic").versionAsInProject(), // + + //mavenBundle(ODL, "sal-binding-broker-impl").versionAsInProject().update(), // + mavenBundle(ODL, "sal-common").versionAsInProject(), // + mavenBundle(ODL, "sal-common-api").versionAsInProject(),// + mavenBundle(ODL, "sal-common-impl").versionAsInProject(), // + mavenBundle(ODL, "sal-common-util").versionAsInProject(), // + mavenBundle(ODL, "sal-core-api").versionAsInProject().update(), // + mavenBundle(ODL, "sal-broker-impl").versionAsInProject(), // + mavenBundle(ODL, "sal-core-spi").versionAsInProject().update(), // + mavenBundle(ODL, "sal-connector-api").versionAsInProject(), // + mavenBundle(SAMPLE, "zeromq-test-consumer").versionAsInProject(), // + mavenBundle(ODL, "sal-zeromq-connector").versionAsInProject(), // + mavenBundle(YANG, "concepts").versionAsInProject(), + mavenBundle(YANG, "yang-binding").versionAsInProject(), // + mavenBundle(YANG, "yang-common").versionAsInProject(), // + mavenBundle(YANG, "yang-data-api").versionAsInProject(), // + mavenBundle(YANG, "yang-model-api").versionAsInProject(), // + mavenBundle(YANG+".thirdparty", "xtend-lib-osgi").versionAsInProject(), // + mavenBundle("com.google.guava", "guava").versionAsInProject(), // + mavenBundle("org.jeromq", "jeromq").versionAsInProject(), + junitBundles() + ); + } + +} diff --git a/opendaylight/md-sal/test/zeromq-test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/ServiceProviderController.java b/opendaylight/md-sal/test/zeromq-test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/ServiceProviderController.java new file mode 100644 index 0000000000..2d28b0b5b5 --- /dev/null +++ b/opendaylight/md-sal/test/zeromq-test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/ServiceProviderController.java @@ -0,0 +1,86 @@ +package org.opendaylight.controller.sample.zeromq.test.it; + +import static org.junit.Assert.*; +import static org.ops4j.pax.exam.CoreOptions.junitBundles; +import static org.ops4j.pax.exam.CoreOptions.mavenBundle; +import static org.ops4j.pax.exam.CoreOptions.options; +import static org.ops4j.pax.exam.CoreOptions.systemPackages; +import static org.ops4j.pax.exam.CoreOptions.systemProperty; +import static org.ops4j.pax.exam.CoreOptions.maven; + +import java.util.Collection; + +import javax.inject.Inject; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.opendaylight.controller.sal.binding.api.BindingAwareBroker; +import org.ops4j.pax.exam.Configuration; +import org.ops4j.pax.exam.CoreOptions; +import org.ops4j.pax.exam.Option; +import org.ops4j.pax.exam.junit.PaxExam; +import org.osgi.framework.BundleContext; +import org.osgi.framework.InvalidSyntaxException; +import org.osgi.framework.ServiceReference; + +@RunWith(PaxExam.class) +public class ServiceProviderController { + + public static final String ODL = "org.opendaylight.controller"; + public static final String YANG = "org.opendaylight.yangtools"; + public static final String SAMPLE = "org.opendaylight.controller.samples"; + + @Test + public void properInitialized() throws Exception { + + Thread.sleep(30000); // Waiting for services to get wired. + assertTrue(true); + //assertTrue(consumer.createToast(WhiteBread.class, 5)); + + } + +// @Inject +// BindingAwareBroker broker; + +// @Inject +// ToastConsumer consumer; + + @Inject + BundleContext ctx; + + @Configuration + public Option[] config() { + return options(systemProperty("osgi.console").value("2401"), + systemProperty("pub.port").value("5556"), + systemProperty("sub.port").value("5557"), + systemProperty("rpc.port").value("5554"), + systemProperty("pub.ip").value("localhost"), + mavenBundle("org.slf4j", "slf4j-api").versionAsInProject(), // + mavenBundle("org.slf4j", "log4j-over-slf4j").versionAsInProject(), // + mavenBundle("ch.qos.logback", "logback-core").versionAsInProject(), // + mavenBundle("ch.qos.logback", "logback-classic").versionAsInProject(), // + + //mavenBundle(ODL, "sal-binding-broker-impl").versionAsInProject().update(), // + mavenBundle(ODL, "sal-common").versionAsInProject(), // + mavenBundle(ODL, "sal-common-api").versionAsInProject(),// + mavenBundle(ODL, "sal-common-impl").versionAsInProject(), // + mavenBundle(ODL, "sal-common-util").versionAsInProject(), // + mavenBundle(ODL, "sal-core-api").versionAsInProject().update(), // + mavenBundle(ODL, "sal-broker-impl").versionAsInProject(), // + mavenBundle(ODL, "sal-core-spi").versionAsInProject().update(), // + mavenBundle(ODL, "sal-connector-api").versionAsInProject(), // + mavenBundle(SAMPLE, "zeromq-test-provider").versionAsInProject(), // + mavenBundle(ODL, "sal-zeromq-connector").versionAsInProject(), // + mavenBundle(YANG, "concepts").versionAsInProject(), + mavenBundle(YANG, "yang-binding").versionAsInProject(), // + mavenBundle(YANG, "yang-common").versionAsInProject(), // + mavenBundle(YANG, "yang-data-api").versionAsInProject(), // + mavenBundle(YANG, "yang-model-api").versionAsInProject(), // + mavenBundle(YANG+".thirdparty", "xtend-lib-osgi").versionAsInProject(), // + mavenBundle("com.google.guava", "guava").versionAsInProject(), // + mavenBundle("org.jeromq", "jeromq").versionAsInProject(), + junitBundles() + ); + } + +} diff --git a/opendaylight/md-sal/test/zeromq-test-provider/pom.xml b/opendaylight/md-sal/test/zeromq-test-provider/pom.xml new file mode 100644 index 0000000000..10e15aa917 --- /dev/null +++ b/opendaylight/md-sal/test/zeromq-test-provider/pom.xml @@ -0,0 +1,86 @@ + + 4.0.0 + + sal-test-parent + org.opendaylight.controller.tests + 1.0-SNAPSHOT + + zeromq-test-provider + bundle + + scm:git:ssh://git.opendaylight.org:29418/controller.git + scm:git:ssh://git.opendaylight.org:29418/controller.git + https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL + + + + + + + org.apache.felix + maven-bundle-plugin + + + org.opendaylight.controller.sample.zeromq.provider.ExampleProvider + + + + + + + + + org.opendaylight.controller + sal-binding-api + 1.0-SNAPSHOT + + + org.opendaylight.controller + sal-common-util + 1.0-SNAPSHOT + + + org.opendaylight.controller + sal-core-api + 1.0-SNAPSHOT + + + + + org.opendaylight.controller + containermanager + 0.5.1-SNAPSHOT + + + + org.opendaylight.controller + sal + 0.5.1-SNAPSHOT + + + org.opendaylight.yangtools + yang-binding + + + org.opendaylight.yangtools + yang-common + + + org.opendaylight.yangtools + yang-data-api + + + + org.opendaylight.controller + sal-common-util + 1.0-SNAPSHOT + + + org.opendaylight.controller + sal-zeromq-connector + 1.0-SNAPSHOT + + + + diff --git a/opendaylight/md-sal/test/zeromq-test-provider/src/main/java/org/opendaylight/controller/sample/zeromq/provider/ExampleProvider.java b/opendaylight/md-sal/test/zeromq-test-provider/src/main/java/org/opendaylight/controller/sample/zeromq/provider/ExampleProvider.java new file mode 100644 index 0000000000..ec7d7a8285 --- /dev/null +++ b/opendaylight/md-sal/test/zeromq-test-provider/src/main/java/org/opendaylight/controller/sample/zeromq/provider/ExampleProvider.java @@ -0,0 +1,67 @@ +package org.opendaylight.controller.sample.zeromq.provider; + +import java.net.URI; +import java.util.Collection; +import java.util.Collections; +import java.util.Set; + +import org.opendaylight.controller.sal.common.util.Rpcs; +import org.opendaylight.controller.sal.core.api.AbstractProvider; +import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; +import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration; +import org.opendaylight.controller.sal.core.api.RpcImplementation; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.osgi.framework.BundleContext; + +public class ExampleProvider extends AbstractProvider implements RpcImplementation { + + private final URI namespace = URI.create("http://cisco.com/example"); + private final QName QNAME = new QName(namespace,"heartbeat"); + private RpcRegistration reg; + + + @Override + public void onSessionInitiated(ProviderSession session) { + //Adding heartbeat 10 times just to make sure subscriber get it + for (int i=0;i<10;i++){ + System.out.println("ExampleProvider: Adding " + QNAME + " " + i); + reg = session.addRpcImplementation(QNAME, this); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + } + + @Override + public Set getSupportedRpcs() { + return Collections.singleton(QNAME); + } + + @Override + public RpcResult invokeRpc(QName rpc, CompositeNode input) { + if(QNAME.equals(rpc)) { + RpcResult output = Rpcs.getRpcResult(true, null, Collections.emptySet()); + return output; + } + RpcResult output = Rpcs.getRpcResult(false, null, Collections.emptySet()); + return output; + } + + @Override + protected void stopImpl(BundleContext context) { + if(reg != null) { + try { + reg.close(); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + } + +} -- 2.36.6