Enhancements to remote rpc server. Using zmq router-dealer bridge to make the server...
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / Sender.java
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Sender.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Sender.java
deleted file mode 100644 (file)
index f53d5ad..0000000
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.sal.connector.remoterpc;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.zeromq.ZMQ;
-
-import java.io.IOException;
-import java.util.concurrent.TimeoutException;
-
-import static com.google.common.base.Preconditions.*;
-
-/**
- * Main server thread for sending requests.
- */
-public class Sender implements Runnable{
-
-  private final static Logger _logger = LoggerFactory.getLogger(Sender.class);
-  private final Client client;
-
-
-  
-  
-  public Sender(Client client) {
-    super();
-    this.client = client;
-  }
-
-@Override
-  public void run() {
-    _logger.info("Starting...");
-
-    try (SocketManager socketManager = new SocketManager()){
-      while (!Thread.currentThread().isInterrupted()) {
-
-        //read incoming messages from blocking queue
-        MessageWrapper request = pollForRequest();
-
-        if (request != null) {
-          processRequest(socketManager, request);
-        }
-
-        flushSockets(socketManager);
-        pollForResponse(socketManager);
-        processResponse(socketManager);
-
-      }
-    } catch(Exception t){
-      _logger.error("Exception: [{}]", t);
-      _logger.error("Stopping...");
-    }
-  }
-
-  private void processResponse(SocketManager socketManager) {
-    for (int i = 0; i < socketManager.getPoller().getSize(); i++) {
-      // If any sockets get a response, process it
-      if (socketManager.getPoller().pollin(i)) {
-        Optional<RpcSocket> socket = socketManager.getManagedSocketFor(
-            socketManager.getPoller().getItem(i).getSocket());
-
-        checkState(socket.isPresent(), "Managed socket not found");
-
-        MessageWrapper response = socket.get().receive();
-        _logger.debug("Received rpc response [{}]", response.getMessage());
-
-        //TODO: handle exception and introduce timeout on receiver side
-        try {
-          response.getReceiveSocket().send(Message.serialize(response.getMessage()));
-        } catch (IOException e) {
-          e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-        }
-      }
-    }
-  }
-
-  private void processRequest(SocketManager socketManager, MessageWrapper request) throws TimeoutException {
-
-    if ((request.getMessage() == null) ||
-        (request.getMessage().getRecipient() == null)) {
-      //invalid message. log and drop
-      _logger.error("Invalid request [{}]", request);
-      return;
-    }
-
-    RpcSocket socket =
-        socketManager.getManagedSocket(request.getMessage().getRecipient());
-
-    socket.send(request);
-  }
-
-  private void flushSockets(SocketManager socketManager){
-    for (RpcSocket socket : socketManager.getManagedSockets()){
-      socket.process();
-    }
-  }
-
-  private MessageWrapper pollForRequest(){
-    return client.getRequestQueue().poll();
-  }
-
-  private void pollForResponse(SocketManager socketManager){
-    try{
-      socketManager.getPoller().poll(10); //poll every 10ms
-    }catch (Throwable t) { /*Ignore and continue*/ }
-  }
-}
-
-
-/*
-SCALA
-
-package org.opendaylight.controller.sal.connector.remoterpc
-
-  import org.slf4j.{LoggerFactory, Logger}
-  import scala.collection.JavaConverters._
-  import scala.Some
-  import org.opendaylight.controller.sal.connector.remoterpc.dto.{MessageWrapper, Message}
-*/
-/**
- * Main server thread for sending requests. This does not maintain any state. If the
- * thread dies, it will be restarted
- */
-/*class Sender extends Runnable {
-  private val _logger: Logger = LoggerFactory.getLogger(Sender.this.getClass())
-
-  override def run = {
-    _logger.info("Sender starting...")
-    val socketManager = new SocketManager()
-
-    try {
-      while (!Thread.currentThread().isInterrupted) {
-        //read incoming messages from blocking queue
-        val request: MessageWrapper = Client.requestQueue.poll()
-
-        if (request != null) {
-          if ((request.message != null) &&
-            (request.message.getRecipient != null)) {
-
-            val socket = socketManager.getManagedSocket(request.message.getRecipient)
-            socket.send(request)
-          } else {
-            //invalid message. log and drop
-            _logger.error("Invalid request [{}]", request)
-          }
-        }
-
-        socketManager.getManagedSockets().asScala.map(s => s.process)
-
-        // Poll all sockets for responses every 1 sec
-        poll(socketManager)
-
-        // If any sockets get a response, process it
-        for (i <- 0 until socketManager.poller.getSize) {
-          if (socketManager.poller.pollin(i)) {
-            val socket = socketManager.getManagedSocketFor(socketManager.poller.getItem(i).getSocket)
-
-            socket match {
-              case None => //{
-                _logger.error("Could not find a managed socket for zmq socket")
-                throw new IllegalStateException("Could not find a managed socket for zmq socket")
-                //}
-              case Some(s) => {
-                val response = s.receive()
-                _logger.debug("Received rpc response [{}]", response.message)
-                response.receiveSocket.send(Message.serialize(response.message))
-              }
-            }
-          }
-        }
-
-      }
-    } catch{
-      case e:Exception => {
-        _logger.debug("Sender stopping due to exception")
-        e.printStackTrace()
-      }
-    } finally {
-      socketManager.stop
-    }
-  }
-
-  def poll(socketManager:SocketManager) = {
-    try{
-      socketManager.poller.poll(10)
-    }catch{
-      case t:Throwable => //ignore and continue
-    }
-  }
-}
-
-
-//    def newThread(r: Runnable): Thread = {
-//      val t = new RequestHandler()
-//      t.setUncaughtExceptionHandler(new RequestProcessorExceptionHandler)
-//      t
-//    }
-
-
-
-/**
- * Restarts the request processing server in the event of unforeseen exceptions
- */
-//private class RequestProcessorExceptionHandler extends UncaughtExceptionHandler {
-//  def uncaughtException(t: Thread, e: Throwable) = {
-//    _logger.error("Exception caught during request processing [{}]", e)
-//    _logger.info("Restarting request processor server...")
-//    RequestProcessor.start()
-//  }