<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>sal-parent</artifactId>
- <version>1.0-SNAPSHOT</version>
- </parent>
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-parent</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>sal-zeromq-connector</artifactId>
+ <packaging>bundle</packaging>
- <artifactId>sal-zeromq-connector</artifactId>
- <packaging>bundle</packaging>
+ <properties>
+ <scala.version>2.10.3</scala.version>
+ </properties>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.felix</groupId>
- <artifactId>maven-bundle-plugin</artifactId>
- <version>${bundle.plugin.version}</version>
- <extensions>true</extensions>
- <configuration>
- <instructions>
- <Import-Package>
- org.opendaylight.controller.sal.connector.api,
- org.opendaylight.controller.sal.core.api,
- org.opendaylight.yangtools.concepts;version="[0.1,1)",
- org.opendaylight.yangtools.yang.common;version="[0.5,1)",
- org.opendaylight.yangtools.yang.data.api;version="[0.5,1)",
- org.zeromq;version="[0.3,1)"
- </Import-Package>
- <Bundle-Activator>org.opendaylight.controller.sal.connector.zeromq.Activator</Bundle-Activator>
- </instructions>
- </configuration>
- </plugin>
- </plugins>
- </build>
- <dependencies>
- <dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>containermanager</artifactId>
- <version>0.5.1-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>commons.northbound</artifactId>
- <version>0.4.1-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>sal</artifactId>
- <version>0.5.1-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.opendaylight.yangtools</groupId>
- <artifactId>yang-binding</artifactId>
- </dependency>
- <dependency>
- <groupId>org.opendaylight.yangtools</groupId>
- <artifactId>yang-common</artifactId>
- </dependency>
- <dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>sal-connector-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>sal-common-util</artifactId>
- <version>1.0-SNAPSHOT</version>
- </dependency>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <version>${bundle.plugin.version}</version>
+ <extensions>true</extensions>
+ <configuration>
+ <instructions>
+ <Import-Package>
+ org.opendaylight.controller.sal.connector.api,
+ org.opendaylight.controller.sal.core.api,
+ org.opendaylight.yangtools.concepts;version="[0.1,1)",
+ org.opendaylight.yangtools.yang.common;version="[0.5,1)",
+ org.opendaylight.yangtools.yang.data.api;version="[0.5,1)",
+ org.zeromq;version="[0.3,1)"
+ </Import-Package>
+ <Bundle-Activator>org.opendaylight.controller.sal.connector.remoterpc.router.zeromq.Activator</Bundle-Activator>
+ </instructions>
+ </configuration>
+ </plugin>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- </dependency>
- <dependency>
- <groupId>org.jeromq</groupId>
- <artifactId>jeromq</artifactId>
- <version>0.3.0-SNAPSHOT</version>
- </dependency>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>3.1.6</version>
+ <configuration>
+ <recompileMode>incremental</recompileMode>
+ <args>
+ <arg>-target:jvm-1.7</arg>
+ </args>
+ <javacArgs>
+ <javacArg>-source</javacArg><javacArg>1.7</javacArg>
+ <javacArg>-target</javacArg><javacArg>1.7</javacArg>
+ </javacArgs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>scala-compile</id>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>scala-test-compile</id>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
- </dependencies>
- <repositories>
- <repository>
- <id>sonatype-nexus-snapshots</id>
- <url>https://oss.sonatype.org/content/repositories/snapshots</url>
- <releases>
- <enabled>false</enabled>
- </releases>
- <snapshots>
- <enabled>true</enabled>
- </snapshots>
- </repository>
- </repositories>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>default-compile</id>
+ <phase>none</phase>
+ </execution>
+ <execution>
+ <id>default-testCompile</id>
+ <phase>none</phase>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>containermanager</artifactId>
+ <version>0.5.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>commons.northbound</artifactId>
+ <version>0.4.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal</artifactId>
+ <version>0.5.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-binding</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-connector-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-common-util</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jeromq</groupId>
+ <artifactId>jeromq</artifactId>
+ <version>0.3.0-SNAPSHOT</version>
+ </dependency>
+
+ </dependencies>
+ <repositories>
+ <repository>
+ <id>sonatype-nexus-snapshots</id>
+ <url>https://oss.sonatype.org/content/repositories/snapshots</url>
+ <releases>
+ <enabled>false</enabled>
+ </releases>
+ <snapshots>
+ <enabled>true</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
</project>
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.connector.remoterpc.api;
+
+import java.util.Map;
+import java.util.Set;
+
+public interface RouteChange<I, R> {
+
+ Map<I, Set<R>> getRemovals();
+ Map<I, Set<R>> getAnnouncements();
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.connector.remoterpc.api;
+
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+
+import java.util.EventListener;
+
+public interface RouteChangeListener extends EventListener {
+
+ public void onRouteChanged(RouteChange<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String> change);
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.connector.remoterpc.api;
+
+import java.util.Set;
+
+public interface RoutingTable<I,R> {
+
+ /**
+ * Adds a network address for the route. If address for route
+ * exists, appends the address to the list
+ *
+ * @param routeId route identifier
+ * @param route network address
+ */
+ public void addRoute(I routeId, R route);
+
+ /**
+ * Adds a network address for the route. If the route already exists,
+ * it throws. This method would be used when registering a global service.
+ *
+ * @param routeId route identifier
+ * @param route network address
+ * @throws DuplicateRouteException
+ */
+ public void addGlobalRoute(I routeId, R route) throws DuplicateRouteException;
+
+ /**
+ * Removes the network address for the route from routing table. If only
+ * one network address existed, remove the route as well.
+ * @param routeId
+ * @param route
+ */
+ public void removeRoute(I routeId, R route);
+
+ /**
+ * Returns a set of network addresses associated with this route
+ * @param routeId
+ * @return
+ */
+ public Set<R> getRoutes(I routeId);
+
+ /**
+ * Returns only one address from the list of network addresses
+ * associated with the route. The algorithm to determine that
+ * one address is upto the implementer
+ * @param route
+ * @return
+ */
+ public R getARoute(I routeId);
+
+ public void registerRouteChangeListener(RouteChangeListener listener);
+
+ public class DuplicateRouteException extends Exception {}
+}
-package org.opendaylight.controller.sal.connector.zeromq;
+package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq;
import org.opendaylight.controller.sal.core.api.AbstractProvider;
import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
-package org.opendaylight.controller.sal.connector.zeromq;
+package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq;
import org.codehaus.jackson.map.ObjectMapper;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import java.io.*;
-import java.util.Arrays;
public class Message implements Serializable {
- public enum MessageType {
- ANNOUNCE((byte) 0),
+ public static enum MessageType {
+ ANNOUNCE((byte) 0), //TODO: Remove announce, add rpc registration and deregistration
HEARTBEAT((byte) 1),
REQUEST((byte) 2),
RESPONSE((byte) 3);
return o.readObject();
}
- public static byte[] toJsonBytes(Message m){
+ public static byte[] toJsonBytes(Message m) throws IOException {
ObjectMapper o = new ObjectMapper();
- try {
- System.out.println(o.writeValueAsString(m));
- return o.writeValueAsBytes(m);
- } catch (IOException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- return null;
+ return o.writeValueAsBytes(m);
}
- public static Message fromJsonBytes(byte [] bytes){
+ public static Message fromJsonBytes(byte [] bytes) throws IOException {
ObjectMapper o = new ObjectMapper();
- Message m = null;
- try {
- m = o.readValue(bytes, Message.class);
- } catch (IOException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- return m;
+ return o.readValue(bytes, Message.class);
}
public static class Response extends Message implements RpcRouter.RpcReply {
}
}
+ /**
+ * Builds a {@link Message} object
+ */
+ public static class MessageBuilder{
+
+ private Message message;
+
+ public MessageBuilder(){
+ message = new Message();
+ }
+
+
+ public MessageBuilder type(MessageType type){
+ message.setType(type);
+ return this;
+ }
+
+ public MessageBuilder sender(String sender){
+ message.setSender(sender);
+ return this;
+ }
+
+ public MessageBuilder route(RpcRouter.RouteIdentifier route){
+ message.setRoute(route);
+ return this;
+ }
+
+ public MessageBuilder payload(Object obj){
+ message.setPayload(obj);
+ return this;
+ }
+
+ public Message build(){
+ return message;
+ }
+ }
}
-package org.opendaylight.controller.sal.connector.zeromq;
+package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.yangtools.yang.common.QName;
-package org.opendaylight.controller.sal.connector.zeromq;
+package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.yangtools.yang.common.QName;
--- /dev/null
+package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq;
+
+import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.controller.sal.connector.remoterpc.router.zeromq.Message.MessageType;
+import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.zeromq.ZMQ;
+
+/**
+ * ZeroMq based implementation of RpcRouter
+ * TODO:
+ * 1. Make it multi VM aware
+ * 2. Make rpc request handling async and non-blocking. Note zmq socket is not thread safe
+ * 3. sendRpc() should use connection pooling
+ * 4. Read properties from config file using existing(?) ODL properties framework
+ */
+public class ZeroMqRpcRouter implements RpcRouter<QName, QName, InstanceIdentifier, Object> {
+
+ private ExecutorService serverPool;
+ private static ExecutorService handlersPool;
+
+ private Map<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String> routingTable;
+
+ private ProviderSession brokerSession;
+
+ private ZMQ.Context context;
+ private ZMQ.Socket publisher;
+ private ZMQ.Socket subscriber;
+ private ZMQ.Socket replySocket;
+
+ private static ZeroMqRpcRouter _instance = new ZeroMqRpcRouter();
+
+ private final RpcFacade facade = new RpcFacade();
+ private final RpcListener listener = new RpcListener();
+
+ private final String localIp = getLocalIpAddress();
+
+ private String pubPort = System.getProperty("pub.port");// port on which announcements are sent
+ private String subPort = System.getProperty("sub.port");// other controller's pub port
+ private String pubIp = System.getProperty("pub.ip"); // other controller's ip
+ private String rpcPort = System.getProperty("rpc.port");// port on which RPC messages are received
+
+ //Prevent instantiation
+ private ZeroMqRpcRouter() {
+ }
+
+ public static ZeroMqRpcRouter getInstance() {
+ return _instance;
+ }
+
+ public void start() {
+ context = ZMQ.context(2);
+ publisher = context.socket(ZMQ.PUB);
+ int ret = publisher.bind("tcp://*:" + pubPort);
+ System.out.println(Thread.currentThread().getName() + " Return(publish bind) :[" + ret + "]");
+ // serverPool = Executors.newSingleThreadExecutor();
+ serverPool = Executors.newCachedThreadPool();
+ handlersPool = Executors.newCachedThreadPool();
+ routingTable = new ConcurrentHashMap<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String>();
+
+ // Start listening for announce and rpc messages
+ serverPool.execute(receive());
+
+ brokerSession.addRpcRegistrationListener(listener);
+
+ Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
+ for (QName rpc : currentlySupported) {
+ listener.onRpcImplementationAdded(rpc);
+ }
+
+ }
+
+ public void stop() {
+ if (handlersPool != null)
+ handlersPool.shutdown();
+ if (serverPool != null)
+ serverPool.shutdown();
+ if (publisher != null) {
+ publisher.setLinger(0);
+ publisher.close();
+ }
+ if (replySocket != null) {
+ replySocket.setLinger(0);
+ replySocket.close();
+ }
+ if (subscriber != null) {
+ subscriber.setLinger(0);
+ subscriber.close();
+ }
+ if (context != null)
+ context.term();
+
+ }
+
+ private Runnable receive() {
+ return new Runnable() {
+ public void run() {
+ try {
+ // Bind to RPC reply socket
+ replySocket = context.socket(ZMQ.REP);
+ replySocket.bind("tcp://*:" + rpcPort);
+
+ // Bind to publishing controller
+ subscriber = context.socket(ZMQ.SUB);
+ subscriber.connect("tcp://" + pubIp + ":" + subPort);
+ System.out.println(Thread.currentThread().getName() + "-Subscribing at[" + "tcp://"
+ + pubIp + ":" + subPort + "]");
+
+ //subscribe for announcements
+ //TODO: Message type would be changed. Update this
+ subscriber.subscribe(Message.serialize(Message.MessageType.ANNOUNCE));
+
+ // Poller enables listening on multiple sockets using a single thread
+ ZMQ.Poller poller = new ZMQ.Poller(2);
+ poller.register(replySocket, ZMQ.Poller.POLLIN);
+ poller.register(subscriber, ZMQ.Poller.POLLIN);
+ System.out.println(Thread.currentThread().getName() + "-Start Polling");
+
+ //TODO: Add code to restart the thread after exception
+ while (!Thread.currentThread().isInterrupted()) {
+
+ poller.poll();
+
+ if (poller.pollin(0)) {
+ handleRpcCall();
+ }
+ if (poller.pollin(1)) {
+ handleAnnouncement();
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ replySocket.setLinger(0);
+ replySocket.close();
+ subscriber.setLinger(0);
+ subscriber.close();
+ }
+ };
+ }
+
+ /**
+ * @throws IOException
+ * @throws ClassNotFoundException
+ */
+ private void handleAnnouncement() throws IOException, ClassNotFoundException {
+ System.out.println("\n" + Thread.currentThread().getName() + "-Received message");
+ Message.MessageType topic = (MessageType) Message.deserialize(subscriber.recv());
+ System.out.println("Topic:[" + topic + "]");
+
+ if (subscriber.hasReceiveMore()) {
+ try {
+ Message m = (Message) Message.deserialize(subscriber.recv());
+ System.out.println(m);
+ // TODO: check on msg type or topic. Both
+ // should be same. Need to normalize.
+ if (Message.MessageType.ANNOUNCE == m.getType())
+ updateRoutingTable(m);
+ } catch (IOException | ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+ /**
+ * @throws InterruptedException
+ * @throws ExecutionException
+ */
+ private void handleRpcCall() throws InterruptedException, ExecutionException {
+ try {
+ Message req = parseMessage(replySocket);
+
+ System.out.println("Received RPC request [" + req + "]");
+
+ // Call broker to process the message then reply
+ Future<RpcResult<CompositeNode>> rpc = brokerSession.rpc(
+ (QName) req.getRoute().getType(), (CompositeNode) req.getPayload());
+
+ RpcResult<CompositeNode> result = rpc.get();
+
+ Message response = new Message.MessageBuilder()
+ .type(MessageType.RESPONSE)
+ .sender(localIp + ":" + rpcPort)
+ .route(req.getRoute())
+ //.payload(result) TODO: enable and test
+ .build();
+
+ replySocket.send(Message.serialize(response));
+
+ System.out.println("Sent RPC response [" + response + "]");
+
+ } catch (IOException ex) {
+ //TODO: handle exception and send error codes to caller
+ System.out.println("Rpc request could not be handled" + ex);
+ }
+ }
+
+
+ @Override
+ public Future<RpcReply<Object>> sendRpc(
+ final RpcRequest<QName, QName, InstanceIdentifier, Object> input) {
+
+ return handlersPool.submit(new Callable<RpcReply<Object>>() {
+
+ @Override
+ public RpcReply<Object> call() {
+ ZMQ.Socket requestSocket = context.socket(ZMQ.REQ);
+
+ // TODO pick the ip and port from routing table based on routing identifier
+ requestSocket.connect("tcp://" + pubIp + ":5554");
+
+ Message requestMessage = new Message.MessageBuilder()
+ .type(MessageType.REQUEST)
+ .sender(localIp + ":" + rpcPort)
+ .route(input.getRoutingInformation())
+ .payload(input.getPayload())
+ .build();
+
+ RpcReply<Object> reply = null;
+
+ try {
+ System.out.println("\n\nRPC Request [" + requestMessage + "]");
+
+ requestSocket.send(Message.serialize(requestMessage));
+ final Message resp = parseMessage(requestSocket);
+
+ System.out.println("\n\nRPC Response [" + resp + "]");
+
+ reply = new RpcReply<Object>() {
+
+ @Override
+ public Object getPayload() {
+ return resp.getPayload();
+ }
+ };
+ } catch (IOException ex) {
+ // TODO: Pass exception back to the caller
+ System.out.println("Error in RPC send. Input could not be serialized[" + input + "]");
+ }
+
+ return reply;
+ }
+ });
+ }
+
+ /**
+ * TODO: Remove this implementation and use RoutingTable implementation to send announcements
+ * Publishes a notice to other controllers in the cluster
+ *
+ * @param notice
+ */
+ public void publish(final Message notice) {
+ Runnable task = new Runnable() {
+ public void run() {
+
+ System.out.println(
+ Thread.currentThread().getName() + "-Publisher started at port[" + pubPort + "]");
+
+ try {
+
+ System.out.println(
+ Thread.currentThread().getName() + "-Sending announcement[" + notice + "]");
+
+ publisher.sendMore(Message.serialize(Message.MessageType.ANNOUNCE));
+ publisher.send(Message.serialize(notice));
+
+ } catch (IOException ex) {
+ System.out.println("Error in publishing");
+ ex.printStackTrace();
+ }
+ System.out.println(Thread.currentThread().getName() + "-Published message[" + notice
+ + "]");
+
+ }
+ };
+ handlersPool.execute(task);
+ }
+
+ /**
+ * Finds IPv4 address of the local VM
+ * TODO: This method is non-deterministic. There may be more than one IPv4 address. Cant say which
+ * address will be returned. Read IP from a property file or enhance the code to make it deterministic.
+ * Should we use IP or hostname?
+ *
+ * @return
+ */
+ private String getLocalIpAddress() {
+ String hostAddress = null;
+ Enumeration e = null;
+ try {
+ e = NetworkInterface.getNetworkInterfaces();
+ } catch (SocketException e1) {
+ e1.printStackTrace();
+ }
+ while (e.hasMoreElements()) {
+
+ NetworkInterface n = (NetworkInterface) e.nextElement();
+
+ Enumeration ee = n.getInetAddresses();
+ while (ee.hasMoreElements()) {
+ InetAddress i = (InetAddress) ee.nextElement();
+ if ((i instanceof Inet4Address) && (i.isSiteLocalAddress()))
+ hostAddress = i.getHostAddress();
+ }
+ }
+ return hostAddress;
+
+ }
+
+ /**
+ * TODO: Change to use external routing table implementation
+ *
+ * @param msg
+ */
+ private void updateRoutingTable(Message msg) {
+ routingTable.put(msg.getRoute(), msg.getSender());
+ RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> route = msg.getRoute();
+
+ // Currently only registers rpc implementation.
+ // TODO: do registration for instance based routing
+ QName rpcType = route.getType();
+ RpcRegistration registration = brokerSession.addRpcImplementation(rpcType, facade);
+ }
+
+ /**
+ * @param socket
+ * @return
+ */
+ private Message parseMessage(ZMQ.Socket socket) {
+
+ Message msg = null;
+ try {
+ byte[] bytes = socket.recv();
+ System.out.println("Received bytes:[" + bytes.length + "]");
+ msg = (Message) Message.deserialize(bytes);
+ } catch (Throwable t) {
+ System.out.println("Caught exception");
+ t.printStackTrace();
+ }
+ return msg;
+ }
+
+ private class RpcFacade implements RpcImplementation {
+
+ @Override
+ public Set<QName> getSupportedRpcs() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl();
+ routeId.setType(rpc);
+
+ RpcRequestImpl request = new RpcRequestImpl();
+ request.setRouteIdentifier(routeId);
+ request.setPayload(input);
+
+ final Future<RpcReply<Object>> ret = sendRpc(request);
+
+ //TODO: Review result handling
+ RpcResult<CompositeNode> result = new RpcResult<CompositeNode>() {
+ @Override
+ public boolean isSuccessful() {
+ try {
+ ret.get();
+ } catch (InterruptedException | ExecutionException e) {
+ e.printStackTrace();
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public CompositeNode getResult() {
+ return null;
+ }
+
+ @Override
+ public Collection<RpcError> getErrors() {
+ return Collections.EMPTY_LIST;
+ }
+ };
+ return result;
+ }
+ }
+
+ /**
+ * Listener for rpc registrations
+ */
+ private class RpcListener implements RpcRegistrationListener {
+
+ @Override
+ public void onRpcImplementationAdded(QName name) {
+ System.out.println("In ZeroMQ Rpc Listener onRpcImplementationAdded()");
+
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl();
+ routeId.setType(name);
+
+ //TODO: Make notice immutable and change message type
+ Message notice = new Message.MessageBuilder()
+ .type(MessageType.ANNOUNCE)
+ .sender("tcp://" + localIp + ":" + rpcPort)
+ .route(routeId)
+ .build();
+
+ publish(notice);
+ }
+
+ @Override
+ public void onRpcImplementationRemoved(QName name) {
+ // TODO: send a rpc-deregistrtation notice
+
+ }
+ }
+
+ public void setBrokerSession(ProviderSession session) {
+ this.brokerSession = session;
+
+ }
+
+}
+++ /dev/null
-package org.opendaylight.controller.sal.connector.zeromq;
-
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
-
-import java.io.Serializable;
-
-/**
- * Created with IntelliJ IDEA.
- * User: abhishk2
- * Date: 10/24/13
- * Time: 4:25 PM
- * To change this template use File | Settings | File Templates.
- */
-public class RpcReplyImpl implements RpcRouter.RpcReply<Object>,Serializable {
-
- private Object payload;
-
- @Override
- public Object getPayload() {
- return payload; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void setPayload(Object payload){
- this.payload = payload;
- }
-}
+++ /dev/null
-package org.opendaylight.controller.sal.connector.zeromq;
-
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
-import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
-import org.opendaylight.controller.sal.core.api.RpcImplementation;
-import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
-import org.zeromq.ZMQ;
-
-import java.io.IOException;
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.util.*;
-import java.util.concurrent.*;
-
-public class ZeroMqRpcRouter implements RpcRouter<QName, QName, InstanceIdentifier, Object> {
-
- private ExecutorService serverPool;
- private static ExecutorService handlersPool;
-
- private Map<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String> routingTable;
-
- private ProviderSession brokerSession;
-
- private ZMQ.Context context;
- private ZMQ.Socket publisher;
- private ZMQ.Socket subscriber;
- private ZMQ.Socket replySocket;
-
- private static ZeroMqRpcRouter _instance = new ZeroMqRpcRouter();
-
- private final RpcFacade facade = new RpcFacade();
- private final RpcListener listener = new RpcListener();
-
- private String pubPort = System.getProperty("pub.port");//port on which announcements are sent
- private String subPort = System.getProperty("sub.port");//other controller's pub port
- private String pubIp = System.getProperty("pub.ip"); //other controller's ip
- private String rpcPort = System.getProperty("rpc.port");//port on which RPC messages are received
-
-
- private ZeroMqRpcRouter() {
- }
-
- public static ZeroMqRpcRouter getInstance() {
- return _instance;
- }
-
- public void start() {
- context = ZMQ.context(2);
- serverPool = Executors.newSingleThreadExecutor();
- handlersPool = Executors.newCachedThreadPool();
- routingTable = new ConcurrentHashMap<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String>();
-
- // Start listening for announce and rpc messages
- serverPool.execute(receive());
-
-
- brokerSession.addRpcRegistrationListener(listener);
- Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
- for(QName rpc : currentlySupported) {
- listener.onRpcImplementationAdded(rpc);
- }
-
-
- }
-
- public void stop() {
- if (handlersPool != null) handlersPool.shutdown();
- if (serverPool != null) serverPool.shutdown();
- if (publisher != null) {
- publisher.setLinger(0);
- publisher.close();
- }
- if (replySocket != null) {
- replySocket.setLinger(0);
- replySocket.close();
- }
- if (subscriber != null) {
- subscriber.setLinger(0);
- subscriber.close();
- }
- if (context != null) context.term();
-
-
- }
-
- private Runnable receive() {
- return new Runnable() {
- public void run() {
- try {
- // Bind to RPC reply socket
- replySocket = context.socket(ZMQ.REP);
- replySocket.bind("tcp://*:" + rpcPort);
-
- // Bind to publishing controller
- subscriber = context.socket(ZMQ.SUB);
- subscriber.connect("tcp://" + pubIp + ":" + subPort);
- System.out.println("Subscribing at[" + "tcp://" + pubIp + ":" + subPort + "]");
-
- subscriber.subscribe(Message.serialize(Message.MessageType.ANNOUNCE));
-
- // Initialize poll set
- ZMQ.Poller poller = new ZMQ.Poller(2);
- poller.register(replySocket, ZMQ.Poller.POLLIN);
- poller.register(subscriber, ZMQ.Poller.POLLIN);
-
- while (!Thread.currentThread().isInterrupted()) {
-
- poller.poll(250);
- //TODO: Fix this
- if (poller.pollin(0)) {
- //receive rpc request and reply
- try {
- Message req = parseMessage(replySocket);
- Message resp = new Message();
- //Call broker to process the message then reply
- Future<RpcResult<CompositeNode>> rpc = brokerSession.rpc((QName) req.getRoute().getType(), (CompositeNode) req.getPayload());
- RpcResult<CompositeNode> result = rpc.get();
- resp.setType(Message.MessageType.RESPONSE);
- resp.setSender(getLocalIpAddress() + ":" + rpcPort);
- resp.setRoute(req.getRoute());
- resp.setPayload(result.isSuccessful());
- replySocket.send(Message.serialize(resp));
-
- } catch (IOException ex) {// | ClassNotFoundException ex) {
- System.out.println("Rpc request could not be handled" + ex);
- }
- }
- if (poller.pollin(1)) {
- //get subscription and update routing table
- //try {
- Message.MessageType topic = (Message.MessageType)Message.deserialize(subscriber.recv());
- System.out.println("Topic:[" + topic + "]");
-
- if (subscriber.hasReceiveMore()) {
- try {
- Message m = (Message) Message.deserialize(subscriber.recv());
- System.out.println(m);
- //TODO: check on msg type or topic. Both should be same. Need to normalize.
- if (Message.MessageType.ANNOUNCE == m.getType()) updateRoutingTable(m);
- } catch (IOException | ClassNotFoundException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
-//
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- replySocket.setLinger(0);
- replySocket.close();
- subscriber.setLinger(0);
- subscriber.close();
- }
- };
- }
-
- private void updateRoutingTable(Message msg) {
- routingTable.put(msg.getRoute(), msg.getSender());
- RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> route = msg.getRoute();
- QName rpcType = route.getType();
- System.out.println("Routing Table\n" + routingTable);
-
- RpcRegistration registration = brokerSession.addRpcImplementation(rpcType, facade);
- }
-
- private Message parseMessage(ZMQ.Socket socket) {
- //Message m = new Message();
- //socket.setReceiveBufferSize(40000);
- Message msg = null;
- try {
- byte[] bytes = socket.recv();
- System.out.println("Received bytes:[" + bytes.length + "]");
- msg = (Message) Message.deserialize(bytes);
- } catch (Throwable t) {
- System.out.println("Caught exception");
- t.printStackTrace();
- }
- return msg;
- /*m.setType((Message.MessageType) Message.deserialize(socket.recv()));
-
- if (socket.hasReceiveMore()) {
- m.setSender((String) Message.deserialize(socket.recv()));
- }
- if (socket.hasReceiveMore()) {
- m.setRoute((RouteIdentifier) Message.deserialize(socket.recv()));
- }
- if (socket.hasReceiveMore()) {
- m.setPayload(Message.deserialize(socket.recv()));
- }
- return m;*/
- }
-
- @Override
- public Future<RpcReply<Object>> sendRpc(final RpcRequest<QName, QName, InstanceIdentifier, Object> input) {
-
- return handlersPool.submit(new Callable<RpcReply<Object>>() {
-
- @Override
- public RpcReply<Object> call() {
- ZMQ.Socket requestSocket = context.socket(ZMQ.REQ);
- Message req = new Message();
- Message resp = null;
- RpcReplyImpl reply = new RpcReplyImpl();
- requestSocket.connect((String) routingTable.get(input.getRoutingInformation().getRoute()));
-
- req.setType(Message.MessageType.REQUEST);
- req.setSender(getLocalIpAddress() + ":" + rpcPort);
- req.setRoute(input.getRoutingInformation());
- req.setPayload(input.getPayload());
- try {
- requestSocket.send(Message.serialize(req));
- resp = parseMessage(requestSocket);
- reply.setPayload(resp.getPayload());
- } catch (IOException ex) {//| ClassNotFoundException ex) {
- //Log and ignore
- System.out.println("Error in RPC send. Input could not be serialized[" + input + "]");
- }
-
- return reply;
- }
- });
- }
-
- public void publish(final Message message) {
- Runnable task = new Runnable() {
- public void run() {
- // Bind to publishing port
- publisher = context.socket(ZMQ.PUB);
- publisher.bind("tcp://*:" + pubPort);
- System.out.println("Publisher started at port[" + pubPort + "]");
- try {
- Message outMessage = new Message();
- outMessage.setType(Message.MessageType.ANNOUNCE);
- outMessage.setSender("tcp://" + getLocalIpAddress() + ":" + rpcPort);
- outMessage.setRoute(message.getRoute());
-
- System.out.println("Sending announcement[" + outMessage + "]");
- publisher.sendMore(Message.serialize(Message.MessageType.ANNOUNCE));
- publisher.send(Message.serialize(outMessage));
-
- } catch (IOException ex) {
- //Log and ignore
- System.out.println("Error in publishing");
- ex.printStackTrace();
- }
- System.out.println("Published message[" + message + "]");
- publisher.close();
- }
- };
- handlersPool.execute(task);
- }
-
- private String getLocalIpAddress() {
- String hostAddress = null;
- Enumeration e = null;
- try {
- e = NetworkInterface.getNetworkInterfaces();
- } catch (SocketException e1) {
- e1.printStackTrace();
- }
- while (e.hasMoreElements()) {
-
- NetworkInterface n = (NetworkInterface) e.nextElement();
- Enumeration ee = n.getInetAddresses();
- while (ee.hasMoreElements()) {
- InetAddress i = (InetAddress) ee.nextElement();
- if ((i instanceof Inet4Address) && (i.isSiteLocalAddress()))
- hostAddress = i.getHostAddress();
- }
- }
-
- return hostAddress;
-
-
- }
-
-
- private class RpcFacade implements RpcImplementation {
-
-
- @Override
- public Set<QName> getSupportedRpcs() {
- return Collections.emptySet();
- }
-
- @Override
- public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
-
- RpcRequestImpl request = new RpcRequestImpl();
- RouteIdentifierImpl routeId = new RouteIdentifierImpl();
- routeId.setContext(null);
- routeId.setRoute(null);
- routeId.setType(rpc);
-
- request.setRouteIdentifier(routeId);
- request.setPayload(input);
- // Create message
-
- Future<org.opendaylight.controller.sal.connector.api.RpcRouter.RpcReply<Object>> ret = sendRpc(request);
-
- return null;
- }
- }
-
- private class RpcListener implements RpcRegistrationListener {
-
- @Override
- public void onRpcImplementationAdded(QName name) {
-
- Message msg = new Message();
- RouteIdentifierImpl routeId = new RouteIdentifierImpl();
- routeId.setType(name);
- msg.setRoute(routeId);
- publish(msg);
- }
-
- @Override
- public void onRpcImplementationRemoved(QName name) {
- // TODO Auto-generated method stub
-
- }
- }
-
- public void setBrokerSession(ProviderSession session) {
- this.brokerSession = session;
-
- }
-
-}