Updating ZeroMQ connector implementation. Its a work inprogress. The current implemen... 67/2467/2
authorAbhishek Kumar <abhishk2@cisco.com>
Wed, 6 Nov 2013 22:57:57 +0000 (14:57 -0800)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 8 Nov 2013 19:47:12 +0000 (19:47 +0000)
Signed-off-by: Abhishek Kumar <abhishk2@cisco.com>
Change-Id: I218fa7a99cfe4bce94f5959b47d835a88cb76b92

opendaylight/md-sal/sal-zeromq-connector/pom.xml
opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/api/RouteChange.java [new file with mode: 0644]
opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/api/RouteChangeListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/api/RoutingTable.java [new file with mode: 0644]
opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/Activator.java [moved from opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/Activator.java with 88% similarity]
opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/Message.java [moved from opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/Message.java with 70% similarity]
opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/RouteIdentifierImpl.java [moved from opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/RouteIdentifierImpl.java with 93% similarity]
opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/RpcRequestImpl.java [moved from opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/RpcRequestImpl.java with 93% similarity]
opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/ZeroMqRpcRouter.java [new file with mode: 0644]
opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/RpcReplyImpl.java [deleted file]
opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/ZeroMqRpcRouter.java [deleted file]

index 3db4a65840dd6e1f4356cee52075878796da6d6f..72e49be4de49899f111aeb55ffe65426b0eb3d1e 100644 (file)
 <?xml version="1.0" encoding="UTF-8"?>
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-    <parent>
-        <groupId>org.opendaylight.controller</groupId>
-        <artifactId>sal-parent</artifactId>
-        <version>1.0-SNAPSHOT</version>
-    </parent>
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.opendaylight.controller</groupId>
+    <artifactId>sal-parent</artifactId>
+    <version>1.0-SNAPSHOT</version>
+  </parent>
 
+  <artifactId>sal-zeromq-connector</artifactId>
+  <packaging>bundle</packaging>
 
-    <artifactId>sal-zeromq-connector</artifactId>
-    <packaging>bundle</packaging>
+  <properties>
+    <scala.version>2.10.3</scala.version>
+  </properties>
 
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.felix</groupId>
-                <artifactId>maven-bundle-plugin</artifactId>
-                <version>${bundle.plugin.version}</version>
-                <extensions>true</extensions>
-                <configuration>
-                    <instructions>
-                        <Import-Package>
-                        org.opendaylight.controller.sal.connector.api,
-                        org.opendaylight.controller.sal.core.api,
-                        org.opendaylight.yangtools.concepts;version="[0.1,1)",
-                        org.opendaylight.yangtools.yang.common;version="[0.5,1)",
-                        org.opendaylight.yangtools.yang.data.api;version="[0.5,1)",
-                        org.zeromq;version="[0.3,1)"
-                        </Import-Package>
-                        <Bundle-Activator>org.opendaylight.controller.sal.connector.zeromq.Activator</Bundle-Activator>
-                    </instructions>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
-    <dependencies>
-        <dependency>
-            <groupId>org.opendaylight.controller</groupId>
-            <artifactId>containermanager</artifactId>
-            <version>0.5.1-SNAPSHOT</version>
-        </dependency>
-        <dependency>
-            <groupId>org.opendaylight.controller</groupId>
-            <artifactId>commons.northbound</artifactId>
-            <version>0.4.1-SNAPSHOT</version>
-        </dependency>
-        <dependency>
-            <groupId>org.opendaylight.controller</groupId>
-            <artifactId>sal</artifactId>
-            <version>0.5.1-SNAPSHOT</version>
-        </dependency>
-        <dependency>
-            <groupId>org.opendaylight.yangtools</groupId>
-            <artifactId>yang-binding</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.opendaylight.yangtools</groupId>
-            <artifactId>yang-common</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.opendaylight.controller</groupId>
-            <artifactId>sal-connector-api</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.opendaylight.controller</groupId>
-            <artifactId>sal-common-util</artifactId>
-            <version>1.0-SNAPSHOT</version>
-        </dependency>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.felix</groupId>
+        <artifactId>maven-bundle-plugin</artifactId>
+        <version>${bundle.plugin.version}</version>
+        <extensions>true</extensions>
+        <configuration>
+          <instructions>
+            <Import-Package>
+              org.opendaylight.controller.sal.connector.api,
+              org.opendaylight.controller.sal.core.api,
+              org.opendaylight.yangtools.concepts;version="[0.1,1)",
+              org.opendaylight.yangtools.yang.common;version="[0.5,1)",
+              org.opendaylight.yangtools.yang.data.api;version="[0.5,1)",
+              org.zeromq;version="[0.3,1)"
+            </Import-Package>
+            <Bundle-Activator>org.opendaylight.controller.sal.connector.remoterpc.router.zeromq.Activator</Bundle-Activator>
+          </instructions>
+        </configuration>
+      </plugin>
 
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.jeromq</groupId>
-            <artifactId>jeromq</artifactId>
-            <version>0.3.0-SNAPSHOT</version>
-        </dependency>
+      <plugin>
+        <groupId>net.alchim31.maven</groupId>
+        <artifactId>scala-maven-plugin</artifactId>
+        <version>3.1.6</version>
+        <configuration>
+          <recompileMode>incremental</recompileMode>
+          <args>
+            <arg>-target:jvm-1.7</arg>
+          </args>
+          <javacArgs>
+            <javacArg>-source</javacArg><javacArg>1.7</javacArg>
+            <javacArg>-target</javacArg><javacArg>1.7</javacArg>
+          </javacArgs>
+        </configuration>
+        <executions>
+          <execution>
+            <id>scala-compile</id>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+          <execution>
+            <id>scala-test-compile</id>
+            <goals>
+              <goal>testCompile</goal>
+            </goals>
+          </execution>
+        </executions>
 
-    </dependencies>
-    <repositories>
-        <repository>
-            <id>sonatype-nexus-snapshots</id>
-            <url>https://oss.sonatype.org/content/repositories/snapshots</url>
-            <releases>
-                <enabled>false</enabled>
-            </releases>
-            <snapshots>
-                <enabled>true</enabled>
-            </snapshots>
-        </repository>
-    </repositories>
+      </plugin>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>default-compile</id>
+            <phase>none</phase>
+          </execution>
+          <execution>
+            <id>default-testCompile</id>
+            <phase>none</phase>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <version>${scala.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>containermanager</artifactId>
+      <version>0.5.1-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>commons.northbound</artifactId>
+      <version>0.4.1-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal</artifactId>
+      <version>0.5.1-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>yang-binding</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>yang-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-connector-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-common-util</artifactId>
+      <version>1.0-SNAPSHOT</version>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.jeromq</groupId>
+      <artifactId>jeromq</artifactId>
+      <version>0.3.0-SNAPSHOT</version>
+    </dependency>
+
+  </dependencies>
+  <repositories>
+    <repository>
+      <id>sonatype-nexus-snapshots</id>
+      <url>https://oss.sonatype.org/content/repositories/snapshots</url>
+      <releases>
+        <enabled>false</enabled>
+      </releases>
+      <snapshots>
+        <enabled>true</enabled>
+      </snapshots>
+    </repository>
+  </repositories>
 
 </project>
diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/api/RouteChange.java b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/api/RouteChange.java
new file mode 100644 (file)
index 0000000..ba90f37
--- /dev/null
@@ -0,0 +1,17 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.connector.remoterpc.api;
+
+import java.util.Map;
+import java.util.Set;
+
+public interface RouteChange<I, R> {
+
+  Map<I, Set<R>> getRemovals();
+  Map<I, Set<R>> getAnnouncements();
+}
diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/api/RouteChangeListener.java b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/api/RouteChangeListener.java
new file mode 100644 (file)
index 0000000..701cfaf
--- /dev/null
@@ -0,0 +1,19 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.connector.remoterpc.api;
+
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+
+import java.util.EventListener;
+
+public interface RouteChangeListener extends EventListener {
+
+  public void onRouteChanged(RouteChange<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String>  change);
+}
diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/api/RoutingTable.java b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/api/RoutingTable.java
new file mode 100644 (file)
index 0000000..3c6c42e
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.connector.remoterpc.api;
+
+import java.util.Set;
+
+public interface RoutingTable<I,R> {
+
+  /**
+   * Adds a network address for the route. If address for route
+   * exists, appends the address to the list
+   *
+   * @param routeId route identifier
+   * @param route network address
+   */
+  public void addRoute(I routeId, R route);
+
+  /**
+   * Adds a network address for the route. If the route already exists,
+   * it throws. This method would be used when registering a global service.
+   *
+   * @param routeId route identifier
+   * @param route network address
+   * @throws DuplicateRouteException
+   */
+  public void addGlobalRoute(I routeId, R route) throws DuplicateRouteException;
+
+  /**
+   * Removes the network address for the route from routing table. If only
+   * one network address existed, remove the route as well.
+   * @param routeId
+   * @param route
+   */
+  public void removeRoute(I routeId, R route);
+
+  /**
+   * Returns a set of network addresses associated with this route
+   * @param routeId
+   * @return
+   */
+  public Set<R> getRoutes(I routeId);
+
+  /**
+   * Returns only one address from the list of network addresses
+   * associated with the route. The algorithm to determine that
+   * one address is upto the implementer
+   * @param route
+   * @return
+   */
+  public R getARoute(I routeId);
+
+  public void registerRouteChangeListener(RouteChangeListener listener);
+
+  public class DuplicateRouteException extends Exception {}
+}
@@ -1,16 +1,15 @@
-package org.opendaylight.controller.sal.connector.zeromq;
+package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq;
 
 
 import org.codehaus.jackson.map.ObjectMapper;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
 
 import java.io.*;
-import java.util.Arrays;
 
 public class Message implements Serializable {
 
- public enum MessageType {
-    ANNOUNCE((byte) 0),
+ public static enum MessageType {
+    ANNOUNCE((byte) 0),  //TODO: Remove announce, add rpc registration and deregistration
     HEARTBEAT((byte) 1),
     REQUEST((byte) 2),
     RESPONSE((byte) 3);
@@ -101,27 +100,15 @@ public class Message implements Serializable {
     return o.readObject();
   }
 
-  public static byte[] toJsonBytes(Message m){
+  public static byte[] toJsonBytes(Message m) throws IOException {
     ObjectMapper o = new ObjectMapper();
-    try {
-      System.out.println(o.writeValueAsString(m));
-      return o.writeValueAsBytes(m);
-    } catch (IOException e) {
-      e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-    }
-    return null;
+    return o.writeValueAsBytes(m);
   }
 
-  public static Message fromJsonBytes(byte [] bytes){
+  public static Message fromJsonBytes(byte [] bytes) throws IOException {
 
     ObjectMapper o = new ObjectMapper();
-    Message m = null;
-    try {
-      m = o.readValue(bytes, Message.class);
-    } catch (IOException e) {
-      e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-    }
-    return m;
+    return o.readValue(bytes, Message.class);
   }
 
   public static class Response extends Message implements RpcRouter.RpcReply {
@@ -146,5 +133,41 @@ public class Message implements Serializable {
     }
   }
 
+  /**
+   * Builds a {@link Message} object
+   */
+  public static class MessageBuilder{
+
+    private Message message;
+
+    public MessageBuilder(){
+      message = new Message();
+    }
+
+
+    public MessageBuilder type(MessageType type){
+      message.setType(type);
+      return this;
+    }
+
+    public MessageBuilder sender(String sender){
+      message.setSender(sender);
+      return this;
+    }
+
+    public MessageBuilder route(RpcRouter.RouteIdentifier route){
+      message.setRoute(route);
+      return this;
+    }
+
+    public MessageBuilder payload(Object obj){
+      message.setPayload(obj);
+      return this;
+    }
+
+    public Message build(){
+      return message;
+    }
+  }
 }
 
diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/ZeroMqRpcRouter.java b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/ZeroMqRpcRouter.java
new file mode 100644 (file)
index 0000000..acb733d
--- /dev/null
@@ -0,0 +1,448 @@
+package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq;
+
+import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.controller.sal.connector.remoterpc.router.zeromq.Message.MessageType;
+import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.zeromq.ZMQ;
+
+/**
+ * ZeroMq based implementation of RpcRouter
+ * TODO:
+ *    1. Make it multi VM aware
+ *    2. Make rpc request handling async and non-blocking. Note zmq socket is not thread safe
+ *    3. sendRpc() should use connection pooling
+ *    4. Read properties from config file using existing(?) ODL properties framework
+ */
+public class ZeroMqRpcRouter implements RpcRouter<QName, QName, InstanceIdentifier, Object> {
+
+  private ExecutorService serverPool;
+  private static ExecutorService handlersPool;
+
+  private Map<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String> routingTable;
+
+  private ProviderSession brokerSession;
+
+  private ZMQ.Context context;
+  private ZMQ.Socket publisher;
+  private ZMQ.Socket subscriber;
+  private ZMQ.Socket replySocket;
+
+  private static ZeroMqRpcRouter _instance = new ZeroMqRpcRouter();
+
+  private final RpcFacade facade = new RpcFacade();
+  private final RpcListener listener = new RpcListener();
+
+  private final String localIp = getLocalIpAddress();
+
+  private String pubPort = System.getProperty("pub.port");// port on which announcements are sent
+  private String subPort = System.getProperty("sub.port");// other controller's pub port
+  private String pubIp = System.getProperty("pub.ip"); // other controller's ip
+  private String rpcPort = System.getProperty("rpc.port");// port on which RPC messages are received
+
+  //Prevent instantiation
+  private ZeroMqRpcRouter() {
+  }
+
+  public static ZeroMqRpcRouter getInstance() {
+    return _instance;
+  }
+
+  public void start() {
+    context = ZMQ.context(2);
+    publisher = context.socket(ZMQ.PUB);
+    int ret = publisher.bind("tcp://*:" + pubPort);
+    System.out.println(Thread.currentThread().getName() + " Return(publish bind) :[" + ret + "]");
+    // serverPool = Executors.newSingleThreadExecutor();
+    serverPool = Executors.newCachedThreadPool();
+    handlersPool = Executors.newCachedThreadPool();
+    routingTable = new ConcurrentHashMap<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String>();
+
+    // Start listening for announce and rpc messages
+    serverPool.execute(receive());
+
+    brokerSession.addRpcRegistrationListener(listener);
+
+    Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
+    for (QName rpc : currentlySupported) {
+      listener.onRpcImplementationAdded(rpc);
+    }
+
+  }
+
+  public void stop() {
+    if (handlersPool != null)
+      handlersPool.shutdown();
+    if (serverPool != null)
+      serverPool.shutdown();
+    if (publisher != null) {
+      publisher.setLinger(0);
+      publisher.close();
+    }
+    if (replySocket != null) {
+      replySocket.setLinger(0);
+      replySocket.close();
+    }
+    if (subscriber != null) {
+      subscriber.setLinger(0);
+      subscriber.close();
+    }
+    if (context != null)
+      context.term();
+
+  }
+
+  private Runnable receive() {
+    return new Runnable() {
+      public void run() {
+        try {
+          // Bind to RPC reply socket
+          replySocket = context.socket(ZMQ.REP);
+          replySocket.bind("tcp://*:" + rpcPort);
+
+          // Bind to publishing controller
+          subscriber = context.socket(ZMQ.SUB);
+          subscriber.connect("tcp://" + pubIp + ":" + subPort);
+          System.out.println(Thread.currentThread().getName() + "-Subscribing at[" + "tcp://"
+              + pubIp + ":" + subPort + "]");
+
+          //subscribe for announcements
+          //TODO: Message type would be changed. Update this
+          subscriber.subscribe(Message.serialize(Message.MessageType.ANNOUNCE));
+
+          // Poller enables listening on multiple sockets using a single thread
+          ZMQ.Poller poller = new ZMQ.Poller(2);
+          poller.register(replySocket, ZMQ.Poller.POLLIN);
+          poller.register(subscriber, ZMQ.Poller.POLLIN);
+          System.out.println(Thread.currentThread().getName() + "-Start Polling");
+
+          //TODO: Add code to restart the thread after exception
+          while (!Thread.currentThread().isInterrupted()) {
+
+            poller.poll();
+
+            if (poller.pollin(0)) {
+              handleRpcCall();
+            }
+            if (poller.pollin(1)) {
+              handleAnnouncement();
+            }
+          }
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+        replySocket.setLinger(0);
+        replySocket.close();
+        subscriber.setLinger(0);
+        subscriber.close();
+      }
+    };
+  }
+
+  /**
+   * @throws IOException
+   * @throws ClassNotFoundException
+   */
+  private void handleAnnouncement() throws IOException, ClassNotFoundException {
+    System.out.println("\n" + Thread.currentThread().getName() + "-Received message");
+    Message.MessageType topic = (MessageType) Message.deserialize(subscriber.recv());
+    System.out.println("Topic:[" + topic + "]");
+
+    if (subscriber.hasReceiveMore()) {
+      try {
+        Message m = (Message) Message.deserialize(subscriber.recv());
+        System.out.println(m);
+        // TODO: check on msg type or topic. Both
+        // should be same. Need to normalize.
+        if (Message.MessageType.ANNOUNCE == m.getType())
+          updateRoutingTable(m);
+      } catch (IOException | ClassNotFoundException e) {
+        e.printStackTrace();
+      }
+    }
+
+  }
+
+  /**
+   * @throws InterruptedException
+   * @throws ExecutionException
+   */
+  private void handleRpcCall() throws InterruptedException, ExecutionException {
+    try {
+      Message req = parseMessage(replySocket);
+
+      System.out.println("Received RPC request [" + req + "]");
+
+      // Call broker to process the message then reply
+      Future<RpcResult<CompositeNode>> rpc = brokerSession.rpc(
+          (QName) req.getRoute().getType(), (CompositeNode) req.getPayload());
+
+      RpcResult<CompositeNode> result = rpc.get();
+
+      Message response = new Message.MessageBuilder()
+          .type(MessageType.RESPONSE)
+          .sender(localIp + ":" + rpcPort)
+          .route(req.getRoute())
+          //.payload(result)    TODO: enable and test
+          .build();
+
+      replySocket.send(Message.serialize(response));
+
+      System.out.println("Sent RPC response [" + response + "]");
+
+    } catch (IOException ex) {
+      //TODO: handle exception and send error codes to caller
+      System.out.println("Rpc request could not be handled" + ex);
+    }
+  }
+
+
+  @Override
+  public Future<RpcReply<Object>> sendRpc(
+      final RpcRequest<QName, QName, InstanceIdentifier, Object> input) {
+
+    return handlersPool.submit(new Callable<RpcReply<Object>>() {
+
+      @Override
+      public RpcReply<Object> call() {
+        ZMQ.Socket requestSocket = context.socket(ZMQ.REQ);
+
+        // TODO pick the ip and port from routing table based on routing identifier
+        requestSocket.connect("tcp://" + pubIp + ":5554");
+
+        Message requestMessage = new Message.MessageBuilder()
+            .type(MessageType.REQUEST)
+            .sender(localIp + ":" + rpcPort)
+            .route(input.getRoutingInformation())
+            .payload(input.getPayload())
+            .build();
+
+        RpcReply<Object> reply = null;
+
+        try {
+          System.out.println("\n\nRPC Request [" + requestMessage + "]");
+
+          requestSocket.send(Message.serialize(requestMessage));
+          final Message resp = parseMessage(requestSocket);
+
+          System.out.println("\n\nRPC Response [" + resp + "]");
+
+          reply = new RpcReply<Object>() {
+
+            @Override
+            public Object getPayload() {
+              return resp.getPayload();
+            }
+          };
+        } catch (IOException ex) {
+          // TODO: Pass exception back to the caller
+          System.out.println("Error in RPC send. Input could not be serialized[" + input + "]");
+        }
+
+        return reply;
+      }
+    });
+  }
+
+  /**
+   * TODO: Remove this implementation and use RoutingTable implementation to send announcements
+   * Publishes a notice to other controllers in the cluster
+   *
+   * @param notice
+   */
+  public void publish(final Message notice) {
+    Runnable task = new Runnable() {
+      public void run() {
+
+        System.out.println(
+            Thread.currentThread().getName() + "-Publisher started at port[" + pubPort + "]");
+
+        try {
+
+          System.out.println(
+              Thread.currentThread().getName() + "-Sending announcement[" + notice + "]");
+
+          publisher.sendMore(Message.serialize(Message.MessageType.ANNOUNCE));
+          publisher.send(Message.serialize(notice));
+
+        } catch (IOException ex) {
+          System.out.println("Error in publishing");
+          ex.printStackTrace();
+        }
+        System.out.println(Thread.currentThread().getName() + "-Published message[" + notice
+            + "]");
+
+      }
+    };
+    handlersPool.execute(task);
+  }
+
+  /**
+   * Finds IPv4 address of the local VM
+   * TODO: This method is non-deterministic. There may be more than one IPv4 address. Cant say which
+   * address will be returned. Read IP from a property file or enhance the code to make it deterministic.
+   * Should we use IP or hostname?
+   *
+   * @return
+   */
+  private String getLocalIpAddress() {
+    String hostAddress = null;
+    Enumeration e = null;
+    try {
+      e = NetworkInterface.getNetworkInterfaces();
+    } catch (SocketException e1) {
+      e1.printStackTrace();
+    }
+    while (e.hasMoreElements()) {
+
+      NetworkInterface n = (NetworkInterface) e.nextElement();
+
+      Enumeration ee = n.getInetAddresses();
+      while (ee.hasMoreElements()) {
+        InetAddress i = (InetAddress) ee.nextElement();
+        if ((i instanceof Inet4Address) && (i.isSiteLocalAddress()))
+          hostAddress = i.getHostAddress();
+      }
+    }
+    return hostAddress;
+
+  }
+
+  /**
+   * TODO: Change to use external routing table implementation
+   *
+   * @param msg
+   */
+  private void updateRoutingTable(Message msg) {
+    routingTable.put(msg.getRoute(), msg.getSender());
+    RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> route = msg.getRoute();
+
+    // Currently only registers rpc implementation.
+    // TODO: do registration for instance based routing
+    QName rpcType = route.getType();
+    RpcRegistration registration = brokerSession.addRpcImplementation(rpcType, facade);
+  }
+
+  /**
+   * @param socket
+   * @return
+   */
+  private Message parseMessage(ZMQ.Socket socket) {
+
+    Message msg = null;
+    try {
+      byte[] bytes = socket.recv();
+      System.out.println("Received bytes:[" + bytes.length + "]");
+      msg = (Message) Message.deserialize(bytes);
+    } catch (Throwable t) {
+      System.out.println("Caught exception");
+      t.printStackTrace();
+    }
+    return msg;
+  }
+
+  private class RpcFacade implements RpcImplementation {
+
+    @Override
+    public Set<QName> getSupportedRpcs() {
+      return Collections.emptySet();
+    }
+
+    @Override
+    public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+
+      RouteIdentifierImpl routeId = new RouteIdentifierImpl();
+      routeId.setType(rpc);
+
+      RpcRequestImpl request = new RpcRequestImpl();
+      request.setRouteIdentifier(routeId);
+      request.setPayload(input);
+
+      final Future<RpcReply<Object>> ret = sendRpc(request);
+
+      //TODO: Review result handling
+      RpcResult<CompositeNode> result = new RpcResult<CompositeNode>() {
+        @Override
+        public boolean isSuccessful() {
+          try {
+            ret.get();
+          } catch (InterruptedException | ExecutionException e) {
+            e.printStackTrace();
+            return false;
+          }
+          return true;
+        }
+
+        @Override
+        public CompositeNode getResult() {
+          return null;
+        }
+
+        @Override
+        public Collection<RpcError> getErrors() {
+          return Collections.EMPTY_LIST;
+        }
+      };
+      return result;
+    }
+  }
+
+  /**
+   * Listener for rpc registrations
+   */
+  private class RpcListener implements RpcRegistrationListener {
+
+    @Override
+    public void onRpcImplementationAdded(QName name) {
+      System.out.println("In ZeroMQ Rpc Listener onRpcImplementationAdded()");
+
+      RouteIdentifierImpl routeId = new RouteIdentifierImpl();
+      routeId.setType(name);
+
+      //TODO: Make notice immutable and change message type
+      Message notice = new Message.MessageBuilder()
+          .type(MessageType.ANNOUNCE)
+          .sender("tcp://" + localIp + ":" + rpcPort)
+          .route(routeId)
+          .build();
+
+      publish(notice);
+    }
+
+    @Override
+    public void onRpcImplementationRemoved(QName name) {
+      // TODO: send a rpc-deregistrtation notice
+
+    }
+  }
+
+  public void setBrokerSession(ProviderSession session) {
+    this.brokerSession = session;
+
+  }
+
+}
diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/RpcReplyImpl.java b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/RpcReplyImpl.java
deleted file mode 100644 (file)
index 66ff714..0000000
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.opendaylight.controller.sal.connector.zeromq;
-
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
-
-import java.io.Serializable;
-
-/**
- * Created with IntelliJ IDEA.
- * User: abhishk2
- * Date: 10/24/13
- * Time: 4:25 PM
- * To change this template use File | Settings | File Templates.
- */
-public class RpcReplyImpl implements RpcRouter.RpcReply<Object>,Serializable {
-
-  private Object payload;
-
-  @Override
-  public Object getPayload() {
-    return payload;  //To change body of implemented methods use File | Settings | File Templates.
-  }
-
-  public void setPayload(Object payload){
-    this.payload = payload;
-  }
-}
diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/ZeroMqRpcRouter.java b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/ZeroMqRpcRouter.java
deleted file mode 100644 (file)
index 7e5efda..0000000
+++ /dev/null
@@ -1,337 +0,0 @@
-package org.opendaylight.controller.sal.connector.zeromq;
-
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
-import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
-import org.opendaylight.controller.sal.core.api.RpcImplementation;
-import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
-import org.zeromq.ZMQ;
-
-import java.io.IOException;
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.util.*;
-import java.util.concurrent.*;
-
-public class ZeroMqRpcRouter implements RpcRouter<QName, QName, InstanceIdentifier, Object> {
-
-  private ExecutorService serverPool;
-  private static ExecutorService handlersPool;
-
-  private Map<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String> routingTable;
-
-  private ProviderSession brokerSession;
-
-  private ZMQ.Context context;
-  private ZMQ.Socket publisher;
-  private ZMQ.Socket subscriber;
-  private ZMQ.Socket replySocket;
-
-  private static ZeroMqRpcRouter _instance = new ZeroMqRpcRouter();
-
-  private final RpcFacade facade = new RpcFacade();
-  private final RpcListener listener = new RpcListener();
-
-  private String pubPort = System.getProperty("pub.port");//port on which announcements are sent
-  private String subPort = System.getProperty("sub.port");//other controller's pub port
-  private String pubIp = System.getProperty("pub.ip");    //other controller's ip
-  private String rpcPort = System.getProperty("rpc.port");//port on which RPC messages are received
-
-
-  private ZeroMqRpcRouter() {
-  }
-
-  public static ZeroMqRpcRouter getInstance() {
-    return _instance;
-  }
-
-  public void start() {
-    context = ZMQ.context(2);
-    serverPool = Executors.newSingleThreadExecutor();
-    handlersPool = Executors.newCachedThreadPool();
-    routingTable = new ConcurrentHashMap<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String>();
-
-    // Start listening for announce and rpc messages
-    serverPool.execute(receive());
-
-    
-    brokerSession.addRpcRegistrationListener(listener);
-    Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
-    for(QName rpc : currentlySupported) {
-        listener.onRpcImplementationAdded(rpc);
-    }
-
-
-  }
-
-  public void stop() {
-    if (handlersPool != null) handlersPool.shutdown();
-    if (serverPool != null) serverPool.shutdown();
-    if (publisher != null) {
-      publisher.setLinger(0);
-      publisher.close();
-    }
-    if (replySocket != null) {
-      replySocket.setLinger(0);
-      replySocket.close();
-    }
-    if (subscriber != null) {
-      subscriber.setLinger(0);
-      subscriber.close();
-    }
-    if (context != null) context.term();
-
-
-  }
-
-  private Runnable receive() {
-    return new Runnable() {
-      public void run() {
-        try {
-          // Bind to RPC reply socket
-          replySocket = context.socket(ZMQ.REP);
-          replySocket.bind("tcp://*:" + rpcPort);
-
-          // Bind to publishing controller
-          subscriber = context.socket(ZMQ.SUB);
-          subscriber.connect("tcp://" + pubIp + ":" + subPort);
-          System.out.println("Subscribing at[" + "tcp://" + pubIp + ":" + subPort + "]");
-
-          subscriber.subscribe(Message.serialize(Message.MessageType.ANNOUNCE));
-
-          // Initialize poll set
-          ZMQ.Poller poller = new ZMQ.Poller(2);
-          poller.register(replySocket, ZMQ.Poller.POLLIN);
-          poller.register(subscriber, ZMQ.Poller.POLLIN);
-
-          while (!Thread.currentThread().isInterrupted()) {
-
-            poller.poll(250);
-            //TODO: Fix this
-            if (poller.pollin(0)) {
-              //receive rpc request and reply
-              try {
-                Message req = parseMessage(replySocket);
-                Message resp = new Message();
-                //Call broker to process the message then reply
-                Future<RpcResult<CompositeNode>> rpc = brokerSession.rpc((QName) req.getRoute().getType(), (CompositeNode) req.getPayload());
-                RpcResult<CompositeNode> result = rpc.get();
-                resp.setType(Message.MessageType.RESPONSE);
-                resp.setSender(getLocalIpAddress() + ":" + rpcPort);
-                resp.setRoute(req.getRoute());
-                resp.setPayload(result.isSuccessful());
-                replySocket.send(Message.serialize(resp));
-
-              } catch (IOException ex) {// | ClassNotFoundException ex) {
-                System.out.println("Rpc request could not be handled" + ex);
-              }
-            }
-            if (poller.pollin(1)) {
-              //get subscription and update routing table
-              //try {
-              Message.MessageType topic = (Message.MessageType)Message.deserialize(subscriber.recv());
-              System.out.println("Topic:[" + topic + "]");
-
-              if (subscriber.hasReceiveMore()) {
-                try {
-                  Message m = (Message) Message.deserialize(subscriber.recv());
-                  System.out.println(m);
-                  //TODO: check on msg type or topic. Both should be same. Need to normalize.
-                  if (Message.MessageType.ANNOUNCE == m.getType()) updateRoutingTable(m);
-                } catch (IOException | ClassNotFoundException e) {
-                  e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-                }
-              }
-//
-            }
-          }
-        } catch (Exception e) {
-          e.printStackTrace();
-        }
-        replySocket.setLinger(0);
-        replySocket.close();
-        subscriber.setLinger(0);
-        subscriber.close();
-      }
-    };
-  }
-
-  private void updateRoutingTable(Message msg) {
-    routingTable.put(msg.getRoute(), msg.getSender());
-    RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> route = msg.getRoute();
-    QName rpcType = route.getType();
-    System.out.println("Routing Table\n" + routingTable);
-
-    RpcRegistration registration = brokerSession.addRpcImplementation(rpcType, facade);
-  }
-
-  private Message parseMessage(ZMQ.Socket socket) {
-    //Message m = new Message();
-    //socket.setReceiveBufferSize(40000);
-    Message msg = null;
-    try {
-      byte[] bytes = socket.recv();
-      System.out.println("Received bytes:[" + bytes.length + "]");
-      msg = (Message) Message.deserialize(bytes);
-    } catch (Throwable t) {
-      System.out.println("Caught exception");
-      t.printStackTrace();
-    }
-    return msg;
-    /*m.setType((Message.MessageType) Message.deserialize(socket.recv()));
-
-    if (socket.hasReceiveMore()) {
-      m.setSender((String) Message.deserialize(socket.recv()));
-    }
-    if (socket.hasReceiveMore()) {
-      m.setRoute((RouteIdentifier) Message.deserialize(socket.recv()));
-    }
-    if (socket.hasReceiveMore()) {
-      m.setPayload(Message.deserialize(socket.recv()));
-    }
-    return m;*/
-  }
-
-  @Override
-  public Future<RpcReply<Object>> sendRpc(final RpcRequest<QName, QName, InstanceIdentifier, Object> input) {
-
-    return handlersPool.submit(new Callable<RpcReply<Object>>() {
-
-      @Override
-      public RpcReply<Object> call() {
-        ZMQ.Socket requestSocket = context.socket(ZMQ.REQ);
-        Message req = new Message();
-        Message resp = null;
-        RpcReplyImpl reply = new RpcReplyImpl();
-        requestSocket.connect((String) routingTable.get(input.getRoutingInformation().getRoute()));
-
-        req.setType(Message.MessageType.REQUEST);
-        req.setSender(getLocalIpAddress() + ":" + rpcPort);
-        req.setRoute(input.getRoutingInformation());
-        req.setPayload(input.getPayload());
-        try {
-          requestSocket.send(Message.serialize(req));
-          resp = parseMessage(requestSocket);
-          reply.setPayload(resp.getPayload());
-        } catch (IOException ex) {//| ClassNotFoundException ex) {
-          //Log and ignore
-          System.out.println("Error in RPC send. Input could not be serialized[" + input + "]");
-        }
-
-        return reply;
-      }
-    });
-  }
-
-  public void publish(final Message message) {
-    Runnable task = new Runnable() {
-      public void run() {
-        // Bind to publishing port
-        publisher = context.socket(ZMQ.PUB);
-        publisher.bind("tcp://*:" + pubPort);
-        System.out.println("Publisher started at port[" + pubPort + "]");
-        try {
-          Message outMessage =  new Message();
-          outMessage.setType(Message.MessageType.ANNOUNCE);
-          outMessage.setSender("tcp://" + getLocalIpAddress() + ":" + rpcPort);
-          outMessage.setRoute(message.getRoute());
-
-          System.out.println("Sending announcement[" + outMessage + "]");
-          publisher.sendMore(Message.serialize(Message.MessageType.ANNOUNCE));
-          publisher.send(Message.serialize(outMessage));
-
-        } catch (IOException ex) {
-          //Log and ignore
-          System.out.println("Error in publishing");
-          ex.printStackTrace();
-        }
-        System.out.println("Published message[" + message + "]");
-        publisher.close();
-      }
-    };
-    handlersPool.execute(task);
-  }
-
-  private String getLocalIpAddress() {
-    String hostAddress = null;
-    Enumeration e = null;
-    try {
-      e = NetworkInterface.getNetworkInterfaces();
-    } catch (SocketException e1) {
-      e1.printStackTrace();
-    }
-    while (e.hasMoreElements()) {
-
-      NetworkInterface n = (NetworkInterface) e.nextElement();
-      Enumeration ee = n.getInetAddresses();
-      while (ee.hasMoreElements()) {
-        InetAddress i = (InetAddress) ee.nextElement();
-        if ((i instanceof Inet4Address) && (i.isSiteLocalAddress()))
-          hostAddress = i.getHostAddress();
-      }
-    }
-
-    return hostAddress;
-
-
-  }
-
-
-  private class RpcFacade implements RpcImplementation {
-
-
-    @Override
-    public Set<QName> getSupportedRpcs() {
-      return Collections.emptySet();
-    }
-
-    @Override
-    public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
-
-      RpcRequestImpl request = new RpcRequestImpl();
-      RouteIdentifierImpl routeId = new RouteIdentifierImpl();
-      routeId.setContext(null);
-      routeId.setRoute(null);
-      routeId.setType(rpc);
-
-      request.setRouteIdentifier(routeId);
-      request.setPayload(input);
-      // Create message
-
-      Future<org.opendaylight.controller.sal.connector.api.RpcRouter.RpcReply<Object>> ret = sendRpc(request);
-
-      return null;
-    }
-  }
-
-  private class RpcListener implements RpcRegistrationListener {
-
-    @Override
-    public void onRpcImplementationAdded(QName name) {
-
-      Message msg = new Message();
-      RouteIdentifierImpl routeId = new RouteIdentifierImpl();
-      routeId.setType(name);
-      msg.setRoute(routeId);
-      publish(msg);
-    }
-
-    @Override
-    public void onRpcImplementationRemoved(QName name) {
-      // TODO Auto-generated method stub
-
-    }
-  }
-
-  public void setBrokerSession(ProviderSession session) {
-    this.brokerSession = session;
-
-  }
-
-}