Enhanced message handling 38/5338/9
authorAbhishek Kumar <abhishk2@cisco.com>
Fri, 14 Feb 2014 22:39:35 +0000 (14:39 -0800)
committerTony Tkacik <ttkacik@cisco.com>
Tue, 4 Mar 2014 09:05:46 +0000 (09:05 +0000)
-Added support for ping/pong messages
-Improved logging
-Refactored code, added MessageHandler for better handling of message
-Updated dependency version in pom

Change-Id: I04ff607c287afbe8a2ea156a0f8cd9029c1504ab
Signed-off-by: Abhishek Kumar <abhishk2@cisco.com>
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImpl.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientRequestHandler.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerRequestHandler.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/Message.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/RouteIdentifierImpl.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/SerilizationTest.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/utils/MessagingUtil.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/utils/RemoteServerTestClient.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/resources/AddFlow.xml [new file with mode: 0644]

index 30e11c0..84df2e4 100644 (file)
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
 import org.zeromq.ZMQ;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -140,9 +141,29 @@ public class ClientImpl implements RemoteRpcClient {
       Message response = handler.handle(request);
       CompositeNode payload = null;
 
-      if ( response != null )
-        payload = XmlUtils.xmlToCompositeNode((String) response.getPayload());
+      if ( response != null ) {
 
+        _logger.info("Received response [{}]", response);
+
+        Object rawPayload = response.getPayload();
+        switch (response.getType()) {
+          case ERROR:
+            if ( rawPayload instanceof List )
+              errors = (List) rawPayload;
+              break;
+
+          case RESPONSE:
+            payload = XmlUtils.xmlToCompositeNode((String) rawPayload);
+            break;
+
+          default:
+            errors.add(
+                RpcErrors.getRpcError(null, null,null,null,"Unable to get response from remote controller", null, null)
+            );
+            break;
+
+        }
+      }
       return Rpcs.getRpcResult(true, payload, errors);
 
     } catch (Exception e){
index f3ef4b6..fe70fb7 100644 (file)
@@ -90,6 +90,7 @@ class ClientRequestHandler implements AutoCloseable{
     //otherwise first create the bridge and then send request
     if ( connectedServers.containsKey(remoteServerAddress) )
       return sendMessage(request, remoteServerAddress);
+
     else{
       workerPool.execute(new Worker(remoteServerAddress));
       connectedServers.put(remoteServerAddress, remoteServerAddress);
@@ -105,12 +106,15 @@ class ClientRequestHandler implements AutoCloseable{
     ZMQ.Socket socket = context.socket(ZMQ.REQ);
 
     try {
-      socket.connect( INPROC_PROTOCOL_PREFIX + address);
+      String inProcessSocketAddress = INPROC_PROTOCOL_PREFIX + address;
+      socket.connect( inProcessSocketAddress );
+      _logger.debug("Sending request [{}]", request);
       socket.send(Message.serialize(request));
-      _logger.debug("Request sent. Waiting for reply...");
+      _logger.info("Request sent. Waiting for reply...");
       byte[] reply = socket.recv(0);
-      _logger.debug("Response received");
+      _logger.info("Response received");
       response = (Message) Message.deserialize(reply);
+      _logger.debug("Response [{}]", response);
     } finally {
       socket.close();
     }
@@ -143,7 +147,7 @@ class ClientRequestHandler implements AutoCloseable{
    */
   private class Worker implements Runnable {
     private String name;
-    private String remoteServer;  //<servername:rpc-port>
+    private String remoteServer;  //<serverip:rpc-port>
 
     public Worker(String address){
       this.name = DEFAULT_NAME + "[" + address + "]";
index 949e6ee..2041f03 100644 (file)
@@ -8,10 +8,13 @@
 
 package org.opendaylight.controller.sal.connector.remoterpc;
 
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
 import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
 import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.slf4j.Logger;
@@ -21,6 +24,7 @@ import org.zeromq.ZMQ;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.util.Collection;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -101,11 +105,12 @@ public class ServerRequestHandler implements AutoCloseable{
 
         while (!Thread.currentThread().isInterrupted()) {
 
-          Message request = parseMessage(socket);
-          _logger.debug("Received rpc request [{}]", request);
+          MessageHandler handler = new MessageHandler(socket);
+          handler.receiveMessage();
 
-          if (request != null) {
-            // Call broker to process the message then reply
+          if (handler.hasMessageForBroker()) {
+
+            Message request = handler.getMessage();
             Future<RpcResult<CompositeNode>> rpc = null;
             RpcResult<CompositeNode> result = null;
 
@@ -117,28 +122,14 @@ public class ServerRequestHandler implements AutoCloseable{
 
               result = (rpc != null) ? rpc.get() : null;
 
-            } catch (Exception e) {
-              _logger.debug("Broker threw  [{}]", e);
-            }
-
-            CompositeNode payload = (result != null) ? result.getResult() : null;
-
-            Message response = new Message.MessageBuilder()
-                .type(Message.MessageType.RESPONSE)
-                .sender(serverAddress)
-                .route(request.getRoute())
-                .payload(XmlUtils.compositeNodeToXml(payload))
-                .build();
+              handler.sendResponse(result);
 
-            _logger.debug("Sending rpc response [{}]", response);
-
-            try {
-              socket.send(Message.serialize(response));
             } catch (Exception e) {
-              _logger.debug("rpc response send failed for message [{}]", response);
-              _logger.debug("{}", e);
+              _logger.debug("Broker threw  [{}]", e);
+              handler.sendError(e.getMessage());
             }
           }
+
         }
       } catch (Exception e) {
         printException(e);
@@ -147,16 +138,6 @@ public class ServerRequestHandler implements AutoCloseable{
       }
     }
 
-    /**
-     * @param socket
-     * @return
-     */
-    private Message parseMessage(ZMQ.Socket socket) throws Exception {
-      byte[] bytes = socket.recv(); //this blocks
-      _logger.debug("Received bytes:[{}]", bytes.length);
-      return (Message) Message.deserialize(bytes);
-    }
-
     private void printException(Exception e) {
       try (StringWriter s = new StringWriter();
            PrintWriter p = new PrintWriter(s)) {
@@ -204,4 +185,107 @@ public class ServerRequestHandler implements AutoCloseable{
       super.afterExecute(r, null);
     }
   }
+
+  class MessageHandler{
+    private ZMQ.Socket socket;
+    private Message message;          //parsed message received on zmq server port
+    private boolean messageForBroker = false; //if the message is valid and not a "ping" message
+
+    public MessageHandler(ZMQ.Socket socket){
+      this.socket = socket;
+    }
+
+    void receiveMessage(){
+      byte[] bytes = socket.recv(); //this blocks
+      _logger.debug("Received bytes:[{}]", bytes.length);
+
+      Object objectRecvd = null;
+      try{
+        objectRecvd = Message.deserialize(bytes);
+      }catch (Exception e){
+        sendError(e.getMessage());
+        return;
+      }
+
+      if (!(objectRecvd instanceof Message)) {
+        sendError("Invalid message received");
+        return;
+      }
+
+      message = (Message) objectRecvd;
+
+      _logger.info("Received request [{}]", message);
+
+      if (Message.MessageType.PING == message.getType()){
+        sendPong();
+        return;
+      }
+
+      messageForBroker = true;
+    }
+
+    boolean hasMessageForBroker(){
+      return messageForBroker;
+    }
+
+    Message getMessage(){
+      return message;
+    }
+
+    void sendResponse(RpcResult<CompositeNode> result){
+      CompositeNode payload = (result != null) ? result.getResult() : null;
+
+      String recipient = null;
+      RpcRouter.RouteIdentifier routeId = null;
+
+      if (message != null) {
+        recipient = message.getSender();
+        routeId   = message.getRoute();
+      }
+
+      Message response = new Message.MessageBuilder()
+          .type(Message.MessageType.RESPONSE)
+          .sender(serverAddress)
+          .recipient(recipient)
+          .route(routeId)
+          .payload(XmlUtils.compositeNodeToXml(payload))
+          .build();
+
+      send(response);
+    }
+
+    private void sendError(String msg){
+      Message errorResponse = new Message.MessageBuilder()
+          .type(Message.MessageType.ERROR)
+          .sender(serverAddress)
+          .payload(msg)
+          .build();
+
+      send(errorResponse);
+    }
+
+    private void sendPong(){
+      Message pong = new Message.MessageBuilder()
+          .type(Message.MessageType.PONG)
+          .sender(serverAddress)
+          .build();
+
+      send(pong);
+    }
+
+    private void send(Message msg){
+      byte[] serializedMessage = null;
+      try {
+        serializedMessage = Message.serialize(msg);
+      } catch (Exception e) {
+        _logger.debug("Unexpected error during serialization of response [{}]", msg);
+        return;
+      }
+
+      if (serializedMessage != null)
+        if (socket.send(serializedMessage))
+          _logger.info("Response sent [{}]", msg);
+        else  _logger.debug("Failed to send serialized message");
+    }
+  }
 }
index 95fe99c..519791a 100644 (file)
@@ -15,8 +15,8 @@ import java.io.*;
 public class Message implements Serializable {
 
  public static enum MessageType {
-    ANNOUNCE((byte) 0),  //TODO: Remove announce, add rpc registration and deregistration
-    HEARTBEAT((byte) 1),
+    PING((byte) 0),
+    PONG((byte) 1),
     REQUEST((byte) 2),
     RESPONSE((byte) 3),
     ERROR((byte)4);
@@ -77,6 +77,7 @@ public class Message implements Serializable {
   public void setRecipient(String recipient) {
     this.recipient = recipient;
   }
+
   @Override
   public String toString() {
     return "Message{" +
index 0b26727..4ffcf3e 100644 (file)
@@ -7,15 +7,12 @@
  */
 package org.opendaylight.controller.sal.connector.remoterpc.dto;
 
-import java.io.Serializable;
-import java.net.URI;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 
+import java.io.Serializable;
+
 public class RouteIdentifierImpl implements RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>,Serializable {
 
   private QName context;
@@ -83,4 +80,13 @@ public class RouteIdentifierImpl implements RpcRouter.RouteIdentifier<QName, QNa
     result = prime * result + (route   == null ? 0:route.hashCode());
     return result;
   }
+
+  @Override
+  public String toString() {
+    return "RouteIdentifierImpl{" +
+        "context=" + context +
+        ", type=" + type +
+        ", route=" + route +
+        '}';
+  }
 }
index 454dc37..41422fd 100644 (file)
@@ -38,7 +38,8 @@ public class SerilizationTest {
   @Test
   public void toXml() throws FileNotFoundException {
 
-    InputStream xmlStream = SerilizationTest.class.getResourceAsStream("/FourSimpleChildren.xml");
+    //InputStream xmlStream = SerilizationTest.class.getResourceAsStream("/FourSimpleChildren.xml");
+    InputStream xmlStream = SerilizationTest.class.getResourceAsStream("/AddFlow.xml");
     StringWriter writer = new StringWriter();
 
     CompositeNode data = loadCompositeNode(xmlStream);
@@ -59,8 +60,6 @@ public class SerilizationTest {
     _logger.info("Parsed xml [{}]", writer.toString());
   }
 
-  //Note to self:  Stolen from TestUtils
-  ///Users/alefan/odl/controller4/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/TestUtils.java
   // Figure out how to include TestUtils through pom ...was getting errors
   private CompositeNode loadCompositeNode(InputStream xmlInputStream) throws FileNotFoundException {
     if (xmlInputStream == null) {
index 20cf4f6..a68ee57 100644 (file)
@@ -78,6 +78,41 @@ public class MessagingUtil {
     };
   }
 
+  public static Runnable sendAMessage(final ZMQ.Context context, final String serverAddress, final Message msg)
+      throws IOException, ClassNotFoundException, InterruptedException {
+
+    return new Runnable() {
+      @Override
+      public void run() {
+        final ZMQ.Socket socket = context.socket(ZMQ.REQ);
+        try {
+
+          socket.connect(serverAddress);
+          System.out.println(Thread.currentThread().getName() + " Sending message");
+          try {
+            socket.send(Message.serialize(msg));
+          } catch (IOException e) {
+            e.printStackTrace();
+          }
+          byte[] bytes = socket.recv();
+          Message response = null;
+          try {
+            response = (Message) Message.deserialize(bytes);
+          } catch (IOException e) {
+            e.printStackTrace();
+          } catch (ClassNotFoundException e) {
+            e.printStackTrace();
+          }
+          System.out.println(Thread.currentThread().getName() + " Got response " + response);
+        } catch (Exception x) {
+          x.printStackTrace();
+        } finally {
+          socket.close();
+        }
+      }
+    };
+  }
+
   public static Runnable sendAnEmptyMessage(final ZMQ.Context context, final String serverAddress)
           throws IOException, ClassNotFoundException, InterruptedException {
 
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/utils/RemoteServerTestClient.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/utils/RemoteServerTestClient.java
new file mode 100644 (file)
index 0000000..a71ab86
--- /dev/null
@@ -0,0 +1,89 @@
+package org.opendaylight.controller.sal.connector.remoterpc.utils;
+
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.zeromq.ZMQ;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class RemoteServerTestClient {
+
+
+
+  public static void main(String args[]) throws Exception{
+    String serverAddress = "tcp://10.195.128.108:5666";
+    ZMQ.Context ctx = ZMQ.context(1);
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    RemoteServerTestClient client = new RemoteServerTestClient();
+    executor.execute(
+        MessagingUtil.sendAMessage(ctx, serverAddress, client.createPingMessage(serverAddress))
+    );
+    MessagingUtil.sendAMessage(ctx, serverAddress, client.createPingMessage(serverAddress));
+
+    Thread.sleep(5000);
+    MessagingUtil.closeZmqContext(ctx);
+    executor.shutdown();
+  }
+
+  public Message createPingMessage(String serverAddress){
+    Message ping = new Message.MessageBuilder()
+        .type(Message.MessageType.PING)
+        .sender("localhost:5444")
+        .recipient(serverAddress)
+        .build();
+
+    return ping;
+  }
+  public Message createAddFlowMessage(String serverAddress ){
+
+    RpcRouter.RouteIdentifier routeIdentifier = getAddFlowRpcIdentifier();
+
+    Message addFlow = new Message.MessageBuilder()
+        .type(Message.MessageType.REQUEST)
+        .sender("localhost:5444")
+        .recipient(serverAddress)
+        .route(routeIdentifier)
+        .payload(getAddFlowPayload(1,1))
+        .build();
+
+    return addFlow;
+  }
+
+  private RpcRouter.RouteIdentifier getAddFlowRpcIdentifier(){
+    throw new UnsupportedOperationException();
+  }
+
+  private CompositeNode getAddFlowPayload(int flowId, int tableId){
+    final String xml =
+    "<flow xmlns=\"urn:opendaylight:flow:inventory\">"
+    + "<priority>5</priority>"
+    + "<flow-name>Foo</flow-name>"
+    + "<match>"
+    + "<ethernet-match>"
+    + "<ethernet-type>"
+    + "<type>2048</type>"
+    + "</ethernet-type>"
+    + "</ethernet-match>"
+    + "<ipv4-destination>10.0.10.2/24</ipv4-destination>"
+    + "</match>"
+    + "<id>" + flowId + "</id>"
+    + "<table_id>" + tableId + "</table_id>"
+    + "<instructions>"
+    + "<instruction>"
+    + "<order>0</order>"
+    + "<apply-actions>"
+    + "<action>"
+    + "<order>0</order>"
+    + "<dec-nw-ttl/>"
+    + "</action>"
+    + "</apply-actions>"
+    + "</instruction>"
+    + "</instructions>"
+    + "</flow>";
+
+    return XmlUtils.xmlToCompositeNode(xml);
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/resources/AddFlow.xml b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/resources/AddFlow.xml
new file mode 100644 (file)
index 0000000..b042b8f
--- /dev/null
@@ -0,0 +1,36 @@
+<add-flow xmlns="urn:opendaylight:flow:service">
+  <input>
+    <transaction-uri>BA-7</transaction-uri>
+    <table_id>4</table_id>
+    <priority>5</priority>
+    <node>
+      /(urn:opendaylight:inventory?revision=2013-08-19)nodes/(urn:opendaylight:inventory?revision=2013-08-19)node[{(urn:opendaylight:inventory?revision=2013-08-19)id=openflow:1}]
+    </node>
+    <match>
+      <ipv4-destination>10.0.10.2/24</ipv4-destination>
+      <ethernet-match>
+        <ethernet-type>
+          <type>2048</type>
+        </ethernet-type>
+      </ethernet-match>
+    </match>
+    <instructions>
+      <instruction>
+        <order>0</order>
+        <apply-actions>
+          <action>
+            <order>0</order>
+            <dec-nw-ttl/>
+          </action>
+        </apply-actions>
+      </instruction>
+    </instructions>
+    <flow-table>
+      /(urn:opendaylight:inventory?revision=2013-08-19)nodes/(urn:opendaylight:inventory?revision=2013-08-19)node[{(urn:opendaylight:inventory?revision=2013-08-19)id=openflow:1}]/(urn:opendaylight:flow:inventory?revision=2013-08-19)table[{(urn:opendaylight:flow:inventory?revision=2013-08-19)id=4}]
+    </flow-table>
+    <flow-ref>
+      /(urn:opendaylight:inventory?revision=2013-08-19)nodes/(urn:opendaylight:inventory?revision=2013-08-19)node[{(urn:opendaylight:inventory?revision=2013-08-19)id=openflow:1}]/(urn:opendaylight:flow:inventory?revision=2013-08-19)table[{(urn:opendaylight:flow:inventory?revision=2013-08-19)id=4}]/(urn:opendaylight:flow:inventory?revision=2013-08-19)flow[{(urn:opendaylight:flow:inventory?revision=2013-08-19)id=4}]
+    </flow-ref>
+    <flow-name>Foo</flow-name>
+  </input>
+</add-flow>
\ No newline at end of file

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.