Introducing the Modification classses
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / ServerRequestHandler.java
index 949e6ee9a8fb2653da07a848b414c81160aea2ab..a25387d297935351bbec8d3f6b0af13638f43286 100644 (file)
@@ -8,6 +8,17 @@
 
 package org.opendaylight.controller.sal.connector.remoterpc;
 
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
 import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
 import org.opendaylight.controller.sal.core.api.Broker;
@@ -18,24 +29,18 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.zeromq.ZMQ;
 
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicInteger;
-
 /**
  *
  */
 public class ServerRequestHandler implements AutoCloseable{
 
-  private Logger _logger = LoggerFactory.getLogger(ServerRequestHandler.class);
+  private final Logger _logger = LoggerFactory.getLogger(ServerRequestHandler.class);
   private final String DEFAULT_NAME = "remote-rpc-worker";
-  private String dealerAddress;
-  private String serverAddress;
-  private int workerCount;
-  private ZMQ.Context context;
-  private Broker.ProviderSession broker;
+  private final String dealerAddress;
+  private final String serverAddress;
+  private final int workerCount;
+  private final ZMQ.Context context;
+  private final Broker.ProviderSession broker;
 
   private RequestHandlerThreadPool workerPool;
   private final AtomicInteger threadId = new AtomicInteger();
@@ -83,7 +88,7 @@ public class ServerRequestHandler implements AutoCloseable{
    * Worker to handles RPC request
    */
   private class Worker implements Runnable {
-    private String name;
+    private final String name;
 
     public Worker(int id){
       this.name = DEFAULT_NAME + "-" + id;
@@ -101,11 +106,12 @@ public class ServerRequestHandler implements AutoCloseable{
 
         while (!Thread.currentThread().isInterrupted()) {
 
-          Message request = parseMessage(socket);
-          _logger.debug("Received rpc request [{}]", request);
+          MessageHandler handler = new MessageHandler(socket);
+          handler.receiveMessage();
+
+          if (handler.hasMessageForBroker()) {
 
-          if (request != null) {
-            // Call broker to process the message then reply
+            Message request = handler.getMessage();
             Future<RpcResult<CompositeNode>> rpc = null;
             RpcResult<CompositeNode> result = null;
 
@@ -117,28 +123,14 @@ public class ServerRequestHandler implements AutoCloseable{
 
               result = (rpc != null) ? rpc.get() : null;
 
-            } catch (Exception e) {
-              _logger.debug("Broker threw  [{}]", e);
-            }
-
-            CompositeNode payload = (result != null) ? result.getResult() : null;
-
-            Message response = new Message.MessageBuilder()
-                .type(Message.MessageType.RESPONSE)
-                .sender(serverAddress)
-                .route(request.getRoute())
-                .payload(XmlUtils.compositeNodeToXml(payload))
-                .build();
-
-            _logger.debug("Sending rpc response [{}]", response);
+              handler.sendResponse(result);
 
-            try {
-              socket.send(Message.serialize(response));
             } catch (Exception e) {
-              _logger.debug("rpc response send failed for message [{}]", response);
-              _logger.debug("{}", e);
+              _logger.debug("Broker threw  [{}]", e);
+              handler.sendError(e.getMessage());
             }
           }
+
         }
       } catch (Exception e) {
         printException(e);
@@ -147,16 +139,6 @@ public class ServerRequestHandler implements AutoCloseable{
       }
     }
 
-    /**
-     * @param socket
-     * @return
-     */
-    private Message parseMessage(ZMQ.Socket socket) throws Exception {
-      byte[] bytes = socket.recv(); //this blocks
-      _logger.debug("Received bytes:[{}]", bytes.length);
-      return (Message) Message.deserialize(bytes);
-    }
-
     private void printException(Exception e) {
       try (StringWriter s = new StringWriter();
            PrintWriter p = new PrintWriter(s)) {
@@ -204,4 +186,107 @@ public class ServerRequestHandler implements AutoCloseable{
       super.afterExecute(r, null);
     }
   }
+
+  class MessageHandler{
+    private final ZMQ.Socket socket;
+    private Message message;          //parsed message received on zmq server port
+    private boolean messageForBroker = false; //if the message is valid and not a "ping" message
+
+    public MessageHandler(ZMQ.Socket socket){
+      this.socket = socket;
+    }
+
+    void receiveMessage(){
+      byte[] bytes = socket.recv(); //this blocks
+      _logger.debug("Received bytes:[{}]", bytes.length);
+
+      Object objectRecvd = null;
+      try{
+        objectRecvd = Message.deserialize(bytes);
+      }catch (Exception e){
+        sendError(e.getMessage());
+        return;
+      }
+
+      if (!(objectRecvd instanceof Message)) {
+        sendError("Invalid message received");
+        return;
+      }
+
+      message = (Message) objectRecvd;
+
+      _logger.info("Received request [{}]", message);
+
+      if (Message.MessageType.PING == message.getType()){
+        sendPong();
+        return;
+      }
+
+      messageForBroker = true;
+    }
+
+    boolean hasMessageForBroker(){
+      return messageForBroker;
+    }
+
+    Message getMessage(){
+      return message;
+    }
+
+    void sendResponse(RpcResult<CompositeNode> result){
+      CompositeNode payload = (result != null) ? result.getResult() : null;
+
+      String recipient = null;
+      RpcRouter.RouteIdentifier<?, ?, ?> routeId = null;
+
+      if (message != null) {
+        recipient = message.getSender();
+        routeId   = message.getRoute();
+      }
+
+      Message response = new Message.MessageBuilder()
+          .type(Message.MessageType.RESPONSE)
+          .sender(serverAddress)
+          .recipient(recipient)
+          .route(routeId)
+          .payload(XmlUtils.compositeNodeToXml(payload))
+          .build();
+
+      send(response);
+    }
+
+    private void sendError(String msg){
+      Message errorResponse = new Message.MessageBuilder()
+          .type(Message.MessageType.ERROR)
+          .sender(serverAddress)
+          .payload(msg)
+          .build();
+
+      send(errorResponse);
+    }
+
+    private void sendPong(){
+      Message pong = new Message.MessageBuilder()
+          .type(Message.MessageType.PONG)
+          .sender(serverAddress)
+          .build();
+
+      send(pong);
+    }
+
+    private void send(Message msg){
+      byte[] serializedMessage = null;
+      try {
+        serializedMessage = Message.serialize(msg);
+      } catch (Exception e) {
+        _logger.debug("Unexpected error during serialization of response [{}]", msg);
+        return;
+      }
+
+      if (serializedMessage != null)
+        if (socket.send(serializedMessage))
+          _logger.info("Response sent [{}]", msg);
+        else  _logger.debug("Failed to send serialized message");
+    }
+  }
 }