Implementation for enabling remote rpc calls between 2 instances of md-sal
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / Sender.java
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Sender.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Sender.java
new file mode 100644 (file)
index 0000000..f53d5ad
--- /dev/null
@@ -0,0 +1,218 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.ZMQ;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+import static com.google.common.base.Preconditions.*;
+
+/**
+ * Main server thread for sending requests.
+ */
+public class Sender implements Runnable{
+
+  private final static Logger _logger = LoggerFactory.getLogger(Sender.class);
+  private final Client client;
+
+
+  
+  
+  public Sender(Client client) {
+    super();
+    this.client = client;
+  }
+
+@Override
+  public void run() {
+    _logger.info("Starting...");
+
+    try (SocketManager socketManager = new SocketManager()){
+      while (!Thread.currentThread().isInterrupted()) {
+
+        //read incoming messages from blocking queue
+        MessageWrapper request = pollForRequest();
+
+        if (request != null) {
+          processRequest(socketManager, request);
+        }
+
+        flushSockets(socketManager);
+        pollForResponse(socketManager);
+        processResponse(socketManager);
+
+      }
+    } catch(Exception t){
+      _logger.error("Exception: [{}]", t);
+      _logger.error("Stopping...");
+    }
+  }
+
+  private void processResponse(SocketManager socketManager) {
+    for (int i = 0; i < socketManager.getPoller().getSize(); i++) {
+      // If any sockets get a response, process it
+      if (socketManager.getPoller().pollin(i)) {
+        Optional<RpcSocket> socket = socketManager.getManagedSocketFor(
+            socketManager.getPoller().getItem(i).getSocket());
+
+        checkState(socket.isPresent(), "Managed socket not found");
+
+        MessageWrapper response = socket.get().receive();
+        _logger.debug("Received rpc response [{}]", response.getMessage());
+
+        //TODO: handle exception and introduce timeout on receiver side
+        try {
+          response.getReceiveSocket().send(Message.serialize(response.getMessage()));
+        } catch (IOException e) {
+          e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+        }
+      }
+    }
+  }
+
+  private void processRequest(SocketManager socketManager, MessageWrapper request) throws TimeoutException {
+
+    if ((request.getMessage() == null) ||
+        (request.getMessage().getRecipient() == null)) {
+      //invalid message. log and drop
+      _logger.error("Invalid request [{}]", request);
+      return;
+    }
+
+    RpcSocket socket =
+        socketManager.getManagedSocket(request.getMessage().getRecipient());
+
+    socket.send(request);
+  }
+
+  private void flushSockets(SocketManager socketManager){
+    for (RpcSocket socket : socketManager.getManagedSockets()){
+      socket.process();
+    }
+  }
+
+  private MessageWrapper pollForRequest(){
+    return client.getRequestQueue().poll();
+  }
+
+  private void pollForResponse(SocketManager socketManager){
+    try{
+      socketManager.getPoller().poll(10); //poll every 10ms
+    }catch (Throwable t) { /*Ignore and continue*/ }
+  }
+}
+
+
+/*
+SCALA
+
+package org.opendaylight.controller.sal.connector.remoterpc
+
+  import org.slf4j.{LoggerFactory, Logger}
+  import scala.collection.JavaConverters._
+  import scala.Some
+  import org.opendaylight.controller.sal.connector.remoterpc.dto.{MessageWrapper, Message}
+*/
+/**
+ * Main server thread for sending requests. This does not maintain any state. If the
+ * thread dies, it will be restarted
+ */
+/*class Sender extends Runnable {
+  private val _logger: Logger = LoggerFactory.getLogger(Sender.this.getClass())
+
+  override def run = {
+    _logger.info("Sender starting...")
+    val socketManager = new SocketManager()
+
+    try {
+      while (!Thread.currentThread().isInterrupted) {
+        //read incoming messages from blocking queue
+        val request: MessageWrapper = Client.requestQueue.poll()
+
+        if (request != null) {
+          if ((request.message != null) &&
+            (request.message.getRecipient != null)) {
+
+            val socket = socketManager.getManagedSocket(request.message.getRecipient)
+            socket.send(request)
+          } else {
+            //invalid message. log and drop
+            _logger.error("Invalid request [{}]", request)
+          }
+        }
+
+        socketManager.getManagedSockets().asScala.map(s => s.process)
+
+        // Poll all sockets for responses every 1 sec
+        poll(socketManager)
+
+        // If any sockets get a response, process it
+        for (i <- 0 until socketManager.poller.getSize) {
+          if (socketManager.poller.pollin(i)) {
+            val socket = socketManager.getManagedSocketFor(socketManager.poller.getItem(i).getSocket)
+
+            socket match {
+              case None => //{
+                _logger.error("Could not find a managed socket for zmq socket")
+                throw new IllegalStateException("Could not find a managed socket for zmq socket")
+                //}
+              case Some(s) => {
+                val response = s.receive()
+                _logger.debug("Received rpc response [{}]", response.message)
+                response.receiveSocket.send(Message.serialize(response.message))
+              }
+            }
+          }
+        }
+
+      }
+    } catch{
+      case e:Exception => {
+        _logger.debug("Sender stopping due to exception")
+        e.printStackTrace()
+      }
+    } finally {
+      socketManager.stop
+    }
+  }
+
+  def poll(socketManager:SocketManager) = {
+    try{
+      socketManager.poller.poll(10)
+    }catch{
+      case t:Throwable => //ignore and continue
+    }
+  }
+}
+
+
+//    def newThread(r: Runnable): Thread = {
+//      val t = new RequestHandler()
+//      t.setUncaughtExceptionHandler(new RequestProcessorExceptionHandler)
+//      t
+//    }
+
+
+
+/**
+ * Restarts the request processing server in the event of unforeseen exceptions
+ */
+//private class RequestProcessorExceptionHandler extends UncaughtExceptionHandler {
+//  def uncaughtException(t: Thread, e: Throwable) = {
+//    _logger.error("Exception caught during request processing [{}]", e)
+//    _logger.info("Restarting request processor server...")
+//    RequestProcessor.start()
+//  }