From: Abhishek Kumar Date: Wed, 6 Nov 2013 22:57:57 +0000 (-0800) Subject: Updating ZeroMQ connector implementation. Its a work inprogress. The current implemen... X-Git-Tag: jenkins-controller-bulk-release-prepare-only-2-1~444^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=ebc3c137622733a95a9478742ceccf301d91e466 Updating ZeroMQ connector implementation. Its a work inprogress. The current implementation only handles announcement and rpc routing for global servies. Notification and Identity based routing will be added. Details about the implementation would be available at https://wiki.opendaylight.org/view/Zeromq_connector Signed-off-by: Abhishek Kumar Change-Id: I218fa7a99cfe4bce94f5959b47d835a88cb76b92 --- diff --git a/opendaylight/md-sal/sal-zeromq-connector/pom.xml b/opendaylight/md-sal/sal-zeromq-connector/pom.xml index 3db4a65840..72e49be4de 100644 --- a/opendaylight/md-sal/sal-zeromq-connector/pom.xml +++ b/opendaylight/md-sal/sal-zeromq-connector/pom.xml @@ -1,96 +1,150 @@ - 4.0.0 - - org.opendaylight.controller - sal-parent - 1.0-SNAPSHOT - + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + 4.0.0 + + org.opendaylight.controller + sal-parent + 1.0-SNAPSHOT + + sal-zeromq-connector + bundle - sal-zeromq-connector - bundle + + 2.10.3 + - - - - org.apache.felix - maven-bundle-plugin - ${bundle.plugin.version} - true - - - - org.opendaylight.controller.sal.connector.api, - org.opendaylight.controller.sal.core.api, - org.opendaylight.yangtools.concepts;version="[0.1,1)", - org.opendaylight.yangtools.yang.common;version="[0.5,1)", - org.opendaylight.yangtools.yang.data.api;version="[0.5,1)", - org.zeromq;version="[0.3,1)" - - org.opendaylight.controller.sal.connector.zeromq.Activator - - - - - - - - org.opendaylight.controller - containermanager - 0.5.1-SNAPSHOT - - - org.opendaylight.controller - commons.northbound - 0.4.1-SNAPSHOT - - - org.opendaylight.controller - sal - 0.5.1-SNAPSHOT - - - org.opendaylight.yangtools - yang-binding - - - org.opendaylight.yangtools - yang-common - - - org.opendaylight.controller - sal-connector-api - - - org.opendaylight.controller - sal-common-util - 1.0-SNAPSHOT - + + + + org.apache.felix + maven-bundle-plugin + ${bundle.plugin.version} + true + + + + org.opendaylight.controller.sal.connector.api, + org.opendaylight.controller.sal.core.api, + org.opendaylight.yangtools.concepts;version="[0.1,1)", + org.opendaylight.yangtools.yang.common;version="[0.5,1)", + org.opendaylight.yangtools.yang.data.api;version="[0.5,1)", + org.zeromq;version="[0.3,1)" + + org.opendaylight.controller.sal.connector.remoterpc.router.zeromq.Activator + + + - - junit - junit - - - org.jeromq - jeromq - 0.3.0-SNAPSHOT - + + net.alchim31.maven + scala-maven-plugin + 3.1.6 + + incremental + + -target:jvm-1.7 + + + -source1.7 + -target1.7 + + + + + scala-compile + + compile + + + + scala-test-compile + + testCompile + + + - - - - sonatype-nexus-snapshots - https://oss.sonatype.org/content/repositories/snapshots - - false - - - true - - - + + + maven-compiler-plugin + + + default-compile + none + + + default-testCompile + none + + + + + + + + + org.scala-lang + scala-library + ${scala.version} + + + + org.opendaylight.controller + containermanager + 0.5.1-SNAPSHOT + + + org.opendaylight.controller + commons.northbound + 0.4.1-SNAPSHOT + + + org.opendaylight.controller + sal + 0.5.1-SNAPSHOT + + + org.opendaylight.yangtools + yang-binding + + + org.opendaylight.yangtools + yang-common + + + org.opendaylight.controller + sal-connector-api + + + org.opendaylight.controller + sal-common-util + 1.0-SNAPSHOT + + + + junit + junit + + + org.jeromq + jeromq + 0.3.0-SNAPSHOT + + + + + + sonatype-nexus-snapshots + https://oss.sonatype.org/content/repositories/snapshots + + false + + + true + + + diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/api/RouteChange.java b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/api/RouteChange.java new file mode 100644 index 0000000000..ba90f3705f --- /dev/null +++ b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/api/RouteChange.java @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.sal.connector.remoterpc.api; + +import java.util.Map; +import java.util.Set; + +public interface RouteChange { + + Map> getRemovals(); + Map> getAnnouncements(); +} diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/api/RouteChangeListener.java b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/api/RouteChangeListener.java new file mode 100644 index 0000000000..701cfaf0f5 --- /dev/null +++ b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/api/RouteChangeListener.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.sal.connector.remoterpc.api; + +import org.opendaylight.controller.sal.connector.api.RpcRouter; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; + +import java.util.EventListener; + +public interface RouteChangeListener extends EventListener { + + public void onRouteChanged(RouteChange, String> change); +} diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/api/RoutingTable.java b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/api/RoutingTable.java new file mode 100644 index 0000000000..3c6c42e5c7 --- /dev/null +++ b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/api/RoutingTable.java @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.sal.connector.remoterpc.api; + +import java.util.Set; + +public interface RoutingTable { + + /** + * Adds a network address for the route. If address for route + * exists, appends the address to the list + * + * @param routeId route identifier + * @param route network address + */ + public void addRoute(I routeId, R route); + + /** + * Adds a network address for the route. If the route already exists, + * it throws. This method would be used when registering a global service. + * + * @param routeId route identifier + * @param route network address + * @throws DuplicateRouteException + */ + public void addGlobalRoute(I routeId, R route) throws DuplicateRouteException; + + /** + * Removes the network address for the route from routing table. If only + * one network address existed, remove the route as well. + * @param routeId + * @param route + */ + public void removeRoute(I routeId, R route); + + /** + * Returns a set of network addresses associated with this route + * @param routeId + * @return + */ + public Set getRoutes(I routeId); + + /** + * Returns only one address from the list of network addresses + * associated with the route. The algorithm to determine that + * one address is upto the implementer + * @param route + * @return + */ + public R getARoute(I routeId); + + public void registerRouteChangeListener(RouteChangeListener listener); + + public class DuplicateRouteException extends Exception {} +} diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/Activator.java b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/Activator.java similarity index 88% rename from opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/Activator.java rename to opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/Activator.java index b8933ec615..7468815530 100644 --- a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/Activator.java +++ b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/Activator.java @@ -1,4 +1,4 @@ -package org.opendaylight.controller.sal.connector.zeromq; +package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq; import org.opendaylight.controller.sal.core.api.AbstractProvider; import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/Message.java b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/Message.java similarity index 70% rename from opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/Message.java rename to opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/Message.java index dd87646ed6..500f7d562b 100644 --- a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/Message.java +++ b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/Message.java @@ -1,16 +1,15 @@ -package org.opendaylight.controller.sal.connector.zeromq; +package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq; import org.codehaus.jackson.map.ObjectMapper; import org.opendaylight.controller.sal.connector.api.RpcRouter; import java.io.*; -import java.util.Arrays; public class Message implements Serializable { - public enum MessageType { - ANNOUNCE((byte) 0), + public static enum MessageType { + ANNOUNCE((byte) 0), //TODO: Remove announce, add rpc registration and deregistration HEARTBEAT((byte) 1), REQUEST((byte) 2), RESPONSE((byte) 3); @@ -101,27 +100,15 @@ public class Message implements Serializable { return o.readObject(); } - public static byte[] toJsonBytes(Message m){ + public static byte[] toJsonBytes(Message m) throws IOException { ObjectMapper o = new ObjectMapper(); - try { - System.out.println(o.writeValueAsString(m)); - return o.writeValueAsBytes(m); - } catch (IOException e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - return null; + return o.writeValueAsBytes(m); } - public static Message fromJsonBytes(byte [] bytes){ + public static Message fromJsonBytes(byte [] bytes) throws IOException { ObjectMapper o = new ObjectMapper(); - Message m = null; - try { - m = o.readValue(bytes, Message.class); - } catch (IOException e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - return m; + return o.readValue(bytes, Message.class); } public static class Response extends Message implements RpcRouter.RpcReply { @@ -146,5 +133,41 @@ public class Message implements Serializable { } } + /** + * Builds a {@link Message} object + */ + public static class MessageBuilder{ + + private Message message; + + public MessageBuilder(){ + message = new Message(); + } + + + public MessageBuilder type(MessageType type){ + message.setType(type); + return this; + } + + public MessageBuilder sender(String sender){ + message.setSender(sender); + return this; + } + + public MessageBuilder route(RpcRouter.RouteIdentifier route){ + message.setRoute(route); + return this; + } + + public MessageBuilder payload(Object obj){ + message.setPayload(obj); + return this; + } + + public Message build(){ + return message; + } + } } diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/RouteIdentifierImpl.java b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/RouteIdentifierImpl.java similarity index 93% rename from opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/RouteIdentifierImpl.java rename to opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/RouteIdentifierImpl.java index 8eab01b1fb..fff7fad972 100644 --- a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/RouteIdentifierImpl.java +++ b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/RouteIdentifierImpl.java @@ -1,4 +1,4 @@ -package org.opendaylight.controller.sal.connector.zeromq; +package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq; import org.opendaylight.controller.sal.connector.api.RpcRouter; import org.opendaylight.yangtools.yang.common.QName; diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/RpcRequestImpl.java b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/RpcRequestImpl.java similarity index 93% rename from opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/RpcRequestImpl.java rename to opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/RpcRequestImpl.java index 2361ab76c3..40c5fa10a7 100644 --- a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/RpcRequestImpl.java +++ b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/RpcRequestImpl.java @@ -1,4 +1,4 @@ -package org.opendaylight.controller.sal.connector.zeromq; +package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq; import org.opendaylight.controller.sal.connector.api.RpcRouter; import org.opendaylight.yangtools.yang.common.QName; diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/ZeroMqRpcRouter.java b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/ZeroMqRpcRouter.java new file mode 100644 index 0000000000..acb733d5cb --- /dev/null +++ b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/ZeroMqRpcRouter.java @@ -0,0 +1,448 @@ +package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq; + +import java.io.IOException; +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.Collection; +import java.util.Collections; +import java.util.Enumeration; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.opendaylight.controller.sal.connector.api.RpcRouter; +import org.opendaylight.controller.sal.connector.remoterpc.router.zeromq.Message.MessageType; +import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; +import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration; +import org.opendaylight.controller.sal.core.api.RpcImplementation; +import org.opendaylight.controller.sal.core.api.RpcRegistrationListener; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; +import org.zeromq.ZMQ; + +/** + * ZeroMq based implementation of RpcRouter + * TODO: + * 1. Make it multi VM aware + * 2. Make rpc request handling async and non-blocking. Note zmq socket is not thread safe + * 3. sendRpc() should use connection pooling + * 4. Read properties from config file using existing(?) ODL properties framework + */ +public class ZeroMqRpcRouter implements RpcRouter { + + private ExecutorService serverPool; + private static ExecutorService handlersPool; + + private Map, String> routingTable; + + private ProviderSession brokerSession; + + private ZMQ.Context context; + private ZMQ.Socket publisher; + private ZMQ.Socket subscriber; + private ZMQ.Socket replySocket; + + private static ZeroMqRpcRouter _instance = new ZeroMqRpcRouter(); + + private final RpcFacade facade = new RpcFacade(); + private final RpcListener listener = new RpcListener(); + + private final String localIp = getLocalIpAddress(); + + private String pubPort = System.getProperty("pub.port");// port on which announcements are sent + private String subPort = System.getProperty("sub.port");// other controller's pub port + private String pubIp = System.getProperty("pub.ip"); // other controller's ip + private String rpcPort = System.getProperty("rpc.port");// port on which RPC messages are received + + //Prevent instantiation + private ZeroMqRpcRouter() { + } + + public static ZeroMqRpcRouter getInstance() { + return _instance; + } + + public void start() { + context = ZMQ.context(2); + publisher = context.socket(ZMQ.PUB); + int ret = publisher.bind("tcp://*:" + pubPort); + System.out.println(Thread.currentThread().getName() + " Return(publish bind) :[" + ret + "]"); + // serverPool = Executors.newSingleThreadExecutor(); + serverPool = Executors.newCachedThreadPool(); + handlersPool = Executors.newCachedThreadPool(); + routingTable = new ConcurrentHashMap, String>(); + + // Start listening for announce and rpc messages + serverPool.execute(receive()); + + brokerSession.addRpcRegistrationListener(listener); + + Set currentlySupported = brokerSession.getSupportedRpcs(); + for (QName rpc : currentlySupported) { + listener.onRpcImplementationAdded(rpc); + } + + } + + public void stop() { + if (handlersPool != null) + handlersPool.shutdown(); + if (serverPool != null) + serverPool.shutdown(); + if (publisher != null) { + publisher.setLinger(0); + publisher.close(); + } + if (replySocket != null) { + replySocket.setLinger(0); + replySocket.close(); + } + if (subscriber != null) { + subscriber.setLinger(0); + subscriber.close(); + } + if (context != null) + context.term(); + + } + + private Runnable receive() { + return new Runnable() { + public void run() { + try { + // Bind to RPC reply socket + replySocket = context.socket(ZMQ.REP); + replySocket.bind("tcp://*:" + rpcPort); + + // Bind to publishing controller + subscriber = context.socket(ZMQ.SUB); + subscriber.connect("tcp://" + pubIp + ":" + subPort); + System.out.println(Thread.currentThread().getName() + "-Subscribing at[" + "tcp://" + + pubIp + ":" + subPort + "]"); + + //subscribe for announcements + //TODO: Message type would be changed. Update this + subscriber.subscribe(Message.serialize(Message.MessageType.ANNOUNCE)); + + // Poller enables listening on multiple sockets using a single thread + ZMQ.Poller poller = new ZMQ.Poller(2); + poller.register(replySocket, ZMQ.Poller.POLLIN); + poller.register(subscriber, ZMQ.Poller.POLLIN); + System.out.println(Thread.currentThread().getName() + "-Start Polling"); + + //TODO: Add code to restart the thread after exception + while (!Thread.currentThread().isInterrupted()) { + + poller.poll(); + + if (poller.pollin(0)) { + handleRpcCall(); + } + if (poller.pollin(1)) { + handleAnnouncement(); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + replySocket.setLinger(0); + replySocket.close(); + subscriber.setLinger(0); + subscriber.close(); + } + }; + } + + /** + * @throws IOException + * @throws ClassNotFoundException + */ + private void handleAnnouncement() throws IOException, ClassNotFoundException { + System.out.println("\n" + Thread.currentThread().getName() + "-Received message"); + Message.MessageType topic = (MessageType) Message.deserialize(subscriber.recv()); + System.out.println("Topic:[" + topic + "]"); + + if (subscriber.hasReceiveMore()) { + try { + Message m = (Message) Message.deserialize(subscriber.recv()); + System.out.println(m); + // TODO: check on msg type or topic. Both + // should be same. Need to normalize. + if (Message.MessageType.ANNOUNCE == m.getType()) + updateRoutingTable(m); + } catch (IOException | ClassNotFoundException e) { + e.printStackTrace(); + } + } + + } + + /** + * @throws InterruptedException + * @throws ExecutionException + */ + private void handleRpcCall() throws InterruptedException, ExecutionException { + try { + Message req = parseMessage(replySocket); + + System.out.println("Received RPC request [" + req + "]"); + + // Call broker to process the message then reply + Future> rpc = brokerSession.rpc( + (QName) req.getRoute().getType(), (CompositeNode) req.getPayload()); + + RpcResult result = rpc.get(); + + Message response = new Message.MessageBuilder() + .type(MessageType.RESPONSE) + .sender(localIp + ":" + rpcPort) + .route(req.getRoute()) + //.payload(result) TODO: enable and test + .build(); + + replySocket.send(Message.serialize(response)); + + System.out.println("Sent RPC response [" + response + "]"); + + } catch (IOException ex) { + //TODO: handle exception and send error codes to caller + System.out.println("Rpc request could not be handled" + ex); + } + } + + + @Override + public Future> sendRpc( + final RpcRequest input) { + + return handlersPool.submit(new Callable>() { + + @Override + public RpcReply call() { + ZMQ.Socket requestSocket = context.socket(ZMQ.REQ); + + // TODO pick the ip and port from routing table based on routing identifier + requestSocket.connect("tcp://" + pubIp + ":5554"); + + Message requestMessage = new Message.MessageBuilder() + .type(MessageType.REQUEST) + .sender(localIp + ":" + rpcPort) + .route(input.getRoutingInformation()) + .payload(input.getPayload()) + .build(); + + RpcReply reply = null; + + try { + System.out.println("\n\nRPC Request [" + requestMessage + "]"); + + requestSocket.send(Message.serialize(requestMessage)); + final Message resp = parseMessage(requestSocket); + + System.out.println("\n\nRPC Response [" + resp + "]"); + + reply = new RpcReply() { + + @Override + public Object getPayload() { + return resp.getPayload(); + } + }; + } catch (IOException ex) { + // TODO: Pass exception back to the caller + System.out.println("Error in RPC send. Input could not be serialized[" + input + "]"); + } + + return reply; + } + }); + } + + /** + * TODO: Remove this implementation and use RoutingTable implementation to send announcements + * Publishes a notice to other controllers in the cluster + * + * @param notice + */ + public void publish(final Message notice) { + Runnable task = new Runnable() { + public void run() { + + System.out.println( + Thread.currentThread().getName() + "-Publisher started at port[" + pubPort + "]"); + + try { + + System.out.println( + Thread.currentThread().getName() + "-Sending announcement[" + notice + "]"); + + publisher.sendMore(Message.serialize(Message.MessageType.ANNOUNCE)); + publisher.send(Message.serialize(notice)); + + } catch (IOException ex) { + System.out.println("Error in publishing"); + ex.printStackTrace(); + } + System.out.println(Thread.currentThread().getName() + "-Published message[" + notice + + "]"); + + } + }; + handlersPool.execute(task); + } + + /** + * Finds IPv4 address of the local VM + * TODO: This method is non-deterministic. There may be more than one IPv4 address. Cant say which + * address will be returned. Read IP from a property file or enhance the code to make it deterministic. + * Should we use IP or hostname? + * + * @return + */ + private String getLocalIpAddress() { + String hostAddress = null; + Enumeration e = null; + try { + e = NetworkInterface.getNetworkInterfaces(); + } catch (SocketException e1) { + e1.printStackTrace(); + } + while (e.hasMoreElements()) { + + NetworkInterface n = (NetworkInterface) e.nextElement(); + + Enumeration ee = n.getInetAddresses(); + while (ee.hasMoreElements()) { + InetAddress i = (InetAddress) ee.nextElement(); + if ((i instanceof Inet4Address) && (i.isSiteLocalAddress())) + hostAddress = i.getHostAddress(); + } + } + return hostAddress; + + } + + /** + * TODO: Change to use external routing table implementation + * + * @param msg + */ + private void updateRoutingTable(Message msg) { + routingTable.put(msg.getRoute(), msg.getSender()); + RpcRouter.RouteIdentifier route = msg.getRoute(); + + // Currently only registers rpc implementation. + // TODO: do registration for instance based routing + QName rpcType = route.getType(); + RpcRegistration registration = brokerSession.addRpcImplementation(rpcType, facade); + } + + /** + * @param socket + * @return + */ + private Message parseMessage(ZMQ.Socket socket) { + + Message msg = null; + try { + byte[] bytes = socket.recv(); + System.out.println("Received bytes:[" + bytes.length + "]"); + msg = (Message) Message.deserialize(bytes); + } catch (Throwable t) { + System.out.println("Caught exception"); + t.printStackTrace(); + } + return msg; + } + + private class RpcFacade implements RpcImplementation { + + @Override + public Set getSupportedRpcs() { + return Collections.emptySet(); + } + + @Override + public RpcResult invokeRpc(QName rpc, CompositeNode input) { + + RouteIdentifierImpl routeId = new RouteIdentifierImpl(); + routeId.setType(rpc); + + RpcRequestImpl request = new RpcRequestImpl(); + request.setRouteIdentifier(routeId); + request.setPayload(input); + + final Future> ret = sendRpc(request); + + //TODO: Review result handling + RpcResult result = new RpcResult() { + @Override + public boolean isSuccessful() { + try { + ret.get(); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + return false; + } + return true; + } + + @Override + public CompositeNode getResult() { + return null; + } + + @Override + public Collection getErrors() { + return Collections.EMPTY_LIST; + } + }; + return result; + } + } + + /** + * Listener for rpc registrations + */ + private class RpcListener implements RpcRegistrationListener { + + @Override + public void onRpcImplementationAdded(QName name) { + System.out.println("In ZeroMQ Rpc Listener onRpcImplementationAdded()"); + + RouteIdentifierImpl routeId = new RouteIdentifierImpl(); + routeId.setType(name); + + //TODO: Make notice immutable and change message type + Message notice = new Message.MessageBuilder() + .type(MessageType.ANNOUNCE) + .sender("tcp://" + localIp + ":" + rpcPort) + .route(routeId) + .build(); + + publish(notice); + } + + @Override + public void onRpcImplementationRemoved(QName name) { + // TODO: send a rpc-deregistrtation notice + + } + } + + public void setBrokerSession(ProviderSession session) { + this.brokerSession = session; + + } + +} diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/RpcReplyImpl.java b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/RpcReplyImpl.java deleted file mode 100644 index 66ff7148a5..0000000000 --- a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/RpcReplyImpl.java +++ /dev/null @@ -1,26 +0,0 @@ -package org.opendaylight.controller.sal.connector.zeromq; - -import org.opendaylight.controller.sal.connector.api.RpcRouter; - -import java.io.Serializable; - -/** - * Created with IntelliJ IDEA. - * User: abhishk2 - * Date: 10/24/13 - * Time: 4:25 PM - * To change this template use File | Settings | File Templates. - */ -public class RpcReplyImpl implements RpcRouter.RpcReply,Serializable { - - private Object payload; - - @Override - public Object getPayload() { - return payload; //To change body of implemented methods use File | Settings | File Templates. - } - - public void setPayload(Object payload){ - this.payload = payload; - } -} diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/ZeroMqRpcRouter.java b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/ZeroMqRpcRouter.java deleted file mode 100644 index 7e5efda362..0000000000 --- a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/ZeroMqRpcRouter.java +++ /dev/null @@ -1,337 +0,0 @@ -package org.opendaylight.controller.sal.connector.zeromq; - -import org.opendaylight.controller.sal.connector.api.RpcRouter; -import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; -import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration; -import org.opendaylight.controller.sal.core.api.RpcImplementation; -import org.opendaylight.controller.sal.core.api.RpcRegistrationListener; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.common.RpcResult; -import org.opendaylight.yangtools.yang.data.api.CompositeNode; -import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; -import org.zeromq.ZMQ; - -import java.io.IOException; -import java.net.Inet4Address; -import java.net.InetAddress; -import java.net.NetworkInterface; -import java.net.SocketException; -import java.util.*; -import java.util.concurrent.*; - -public class ZeroMqRpcRouter implements RpcRouter { - - private ExecutorService serverPool; - private static ExecutorService handlersPool; - - private Map, String> routingTable; - - private ProviderSession brokerSession; - - private ZMQ.Context context; - private ZMQ.Socket publisher; - private ZMQ.Socket subscriber; - private ZMQ.Socket replySocket; - - private static ZeroMqRpcRouter _instance = new ZeroMqRpcRouter(); - - private final RpcFacade facade = new RpcFacade(); - private final RpcListener listener = new RpcListener(); - - private String pubPort = System.getProperty("pub.port");//port on which announcements are sent - private String subPort = System.getProperty("sub.port");//other controller's pub port - private String pubIp = System.getProperty("pub.ip"); //other controller's ip - private String rpcPort = System.getProperty("rpc.port");//port on which RPC messages are received - - - private ZeroMqRpcRouter() { - } - - public static ZeroMqRpcRouter getInstance() { - return _instance; - } - - public void start() { - context = ZMQ.context(2); - serverPool = Executors.newSingleThreadExecutor(); - handlersPool = Executors.newCachedThreadPool(); - routingTable = new ConcurrentHashMap, String>(); - - // Start listening for announce and rpc messages - serverPool.execute(receive()); - - - brokerSession.addRpcRegistrationListener(listener); - Set currentlySupported = brokerSession.getSupportedRpcs(); - for(QName rpc : currentlySupported) { - listener.onRpcImplementationAdded(rpc); - } - - - } - - public void stop() { - if (handlersPool != null) handlersPool.shutdown(); - if (serverPool != null) serverPool.shutdown(); - if (publisher != null) { - publisher.setLinger(0); - publisher.close(); - } - if (replySocket != null) { - replySocket.setLinger(0); - replySocket.close(); - } - if (subscriber != null) { - subscriber.setLinger(0); - subscriber.close(); - } - if (context != null) context.term(); - - - } - - private Runnable receive() { - return new Runnable() { - public void run() { - try { - // Bind to RPC reply socket - replySocket = context.socket(ZMQ.REP); - replySocket.bind("tcp://*:" + rpcPort); - - // Bind to publishing controller - subscriber = context.socket(ZMQ.SUB); - subscriber.connect("tcp://" + pubIp + ":" + subPort); - System.out.println("Subscribing at[" + "tcp://" + pubIp + ":" + subPort + "]"); - - subscriber.subscribe(Message.serialize(Message.MessageType.ANNOUNCE)); - - // Initialize poll set - ZMQ.Poller poller = new ZMQ.Poller(2); - poller.register(replySocket, ZMQ.Poller.POLLIN); - poller.register(subscriber, ZMQ.Poller.POLLIN); - - while (!Thread.currentThread().isInterrupted()) { - - poller.poll(250); - //TODO: Fix this - if (poller.pollin(0)) { - //receive rpc request and reply - try { - Message req = parseMessage(replySocket); - Message resp = new Message(); - //Call broker to process the message then reply - Future> rpc = brokerSession.rpc((QName) req.getRoute().getType(), (CompositeNode) req.getPayload()); - RpcResult result = rpc.get(); - resp.setType(Message.MessageType.RESPONSE); - resp.setSender(getLocalIpAddress() + ":" + rpcPort); - resp.setRoute(req.getRoute()); - resp.setPayload(result.isSuccessful()); - replySocket.send(Message.serialize(resp)); - - } catch (IOException ex) {// | ClassNotFoundException ex) { - System.out.println("Rpc request could not be handled" + ex); - } - } - if (poller.pollin(1)) { - //get subscription and update routing table - //try { - Message.MessageType topic = (Message.MessageType)Message.deserialize(subscriber.recv()); - System.out.println("Topic:[" + topic + "]"); - - if (subscriber.hasReceiveMore()) { - try { - Message m = (Message) Message.deserialize(subscriber.recv()); - System.out.println(m); - //TODO: check on msg type or topic. Both should be same. Need to normalize. - if (Message.MessageType.ANNOUNCE == m.getType()) updateRoutingTable(m); - } catch (IOException | ClassNotFoundException e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } -// - } - } - } catch (Exception e) { - e.printStackTrace(); - } - replySocket.setLinger(0); - replySocket.close(); - subscriber.setLinger(0); - subscriber.close(); - } - }; - } - - private void updateRoutingTable(Message msg) { - routingTable.put(msg.getRoute(), msg.getSender()); - RpcRouter.RouteIdentifier route = msg.getRoute(); - QName rpcType = route.getType(); - System.out.println("Routing Table\n" + routingTable); - - RpcRegistration registration = brokerSession.addRpcImplementation(rpcType, facade); - } - - private Message parseMessage(ZMQ.Socket socket) { - //Message m = new Message(); - //socket.setReceiveBufferSize(40000); - Message msg = null; - try { - byte[] bytes = socket.recv(); - System.out.println("Received bytes:[" + bytes.length + "]"); - msg = (Message) Message.deserialize(bytes); - } catch (Throwable t) { - System.out.println("Caught exception"); - t.printStackTrace(); - } - return msg; - /*m.setType((Message.MessageType) Message.deserialize(socket.recv())); - - if (socket.hasReceiveMore()) { - m.setSender((String) Message.deserialize(socket.recv())); - } - if (socket.hasReceiveMore()) { - m.setRoute((RouteIdentifier) Message.deserialize(socket.recv())); - } - if (socket.hasReceiveMore()) { - m.setPayload(Message.deserialize(socket.recv())); - } - return m;*/ - } - - @Override - public Future> sendRpc(final RpcRequest input) { - - return handlersPool.submit(new Callable>() { - - @Override - public RpcReply call() { - ZMQ.Socket requestSocket = context.socket(ZMQ.REQ); - Message req = new Message(); - Message resp = null; - RpcReplyImpl reply = new RpcReplyImpl(); - requestSocket.connect((String) routingTable.get(input.getRoutingInformation().getRoute())); - - req.setType(Message.MessageType.REQUEST); - req.setSender(getLocalIpAddress() + ":" + rpcPort); - req.setRoute(input.getRoutingInformation()); - req.setPayload(input.getPayload()); - try { - requestSocket.send(Message.serialize(req)); - resp = parseMessage(requestSocket); - reply.setPayload(resp.getPayload()); - } catch (IOException ex) {//| ClassNotFoundException ex) { - //Log and ignore - System.out.println("Error in RPC send. Input could not be serialized[" + input + "]"); - } - - return reply; - } - }); - } - - public void publish(final Message message) { - Runnable task = new Runnable() { - public void run() { - // Bind to publishing port - publisher = context.socket(ZMQ.PUB); - publisher.bind("tcp://*:" + pubPort); - System.out.println("Publisher started at port[" + pubPort + "]"); - try { - Message outMessage = new Message(); - outMessage.setType(Message.MessageType.ANNOUNCE); - outMessage.setSender("tcp://" + getLocalIpAddress() + ":" + rpcPort); - outMessage.setRoute(message.getRoute()); - - System.out.println("Sending announcement[" + outMessage + "]"); - publisher.sendMore(Message.serialize(Message.MessageType.ANNOUNCE)); - publisher.send(Message.serialize(outMessage)); - - } catch (IOException ex) { - //Log and ignore - System.out.println("Error in publishing"); - ex.printStackTrace(); - } - System.out.println("Published message[" + message + "]"); - publisher.close(); - } - }; - handlersPool.execute(task); - } - - private String getLocalIpAddress() { - String hostAddress = null; - Enumeration e = null; - try { - e = NetworkInterface.getNetworkInterfaces(); - } catch (SocketException e1) { - e1.printStackTrace(); - } - while (e.hasMoreElements()) { - - NetworkInterface n = (NetworkInterface) e.nextElement(); - Enumeration ee = n.getInetAddresses(); - while (ee.hasMoreElements()) { - InetAddress i = (InetAddress) ee.nextElement(); - if ((i instanceof Inet4Address) && (i.isSiteLocalAddress())) - hostAddress = i.getHostAddress(); - } - } - - return hostAddress; - - - } - - - private class RpcFacade implements RpcImplementation { - - - @Override - public Set getSupportedRpcs() { - return Collections.emptySet(); - } - - @Override - public RpcResult invokeRpc(QName rpc, CompositeNode input) { - - RpcRequestImpl request = new RpcRequestImpl(); - RouteIdentifierImpl routeId = new RouteIdentifierImpl(); - routeId.setContext(null); - routeId.setRoute(null); - routeId.setType(rpc); - - request.setRouteIdentifier(routeId); - request.setPayload(input); - // Create message - - Future> ret = sendRpc(request); - - return null; - } - } - - private class RpcListener implements RpcRegistrationListener { - - @Override - public void onRpcImplementationAdded(QName name) { - - Message msg = new Message(); - RouteIdentifierImpl routeId = new RouteIdentifierImpl(); - routeId.setType(name); - msg.setRoute(routeId); - publish(msg); - } - - @Override - public void onRpcImplementationRemoved(QName name) { - // TODO Auto-generated method stub - - } - } - - public void setBrokerSession(ProviderSession session) { - this.brokerSession = session; - - } - -}