--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.ZMQ;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+import static com.google.common.base.Preconditions.*;
+
+/**
+ * Main server thread for sending requests.
+ */
+public class Sender implements Runnable{
+
+ private final static Logger _logger = LoggerFactory.getLogger(Sender.class);
+ private final Client client;
+
+
+
+
+ public Sender(Client client) {
+ super();
+ this.client = client;
+ }
+
+@Override
+ public void run() {
+ _logger.info("Starting...");
+
+ try (SocketManager socketManager = new SocketManager()){
+ while (!Thread.currentThread().isInterrupted()) {
+
+ //read incoming messages from blocking queue
+ MessageWrapper request = pollForRequest();
+
+ if (request != null) {
+ processRequest(socketManager, request);
+ }
+
+ flushSockets(socketManager);
+ pollForResponse(socketManager);
+ processResponse(socketManager);
+
+ }
+ } catch(Exception t){
+ _logger.error("Exception: [{}]", t);
+ _logger.error("Stopping...");
+ }
+ }
+
+ private void processResponse(SocketManager socketManager) {
+ for (int i = 0; i < socketManager.getPoller().getSize(); i++) {
+ // If any sockets get a response, process it
+ if (socketManager.getPoller().pollin(i)) {
+ Optional<RpcSocket> socket = socketManager.getManagedSocketFor(
+ socketManager.getPoller().getItem(i).getSocket());
+
+ checkState(socket.isPresent(), "Managed socket not found");
+
+ MessageWrapper response = socket.get().receive();
+ _logger.debug("Received rpc response [{}]", response.getMessage());
+
+ //TODO: handle exception and introduce timeout on receiver side
+ try {
+ response.getReceiveSocket().send(Message.serialize(response.getMessage()));
+ } catch (IOException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ }
+ }
+
+ private void processRequest(SocketManager socketManager, MessageWrapper request) throws TimeoutException {
+
+ if ((request.getMessage() == null) ||
+ (request.getMessage().getRecipient() == null)) {
+ //invalid message. log and drop
+ _logger.error("Invalid request [{}]", request);
+ return;
+ }
+
+ RpcSocket socket =
+ socketManager.getManagedSocket(request.getMessage().getRecipient());
+
+ socket.send(request);
+ }
+
+ private void flushSockets(SocketManager socketManager){
+ for (RpcSocket socket : socketManager.getManagedSockets()){
+ socket.process();
+ }
+ }
+
+ private MessageWrapper pollForRequest(){
+ return client.getRequestQueue().poll();
+ }
+
+ private void pollForResponse(SocketManager socketManager){
+ try{
+ socketManager.getPoller().poll(10); //poll every 10ms
+ }catch (Throwable t) { /*Ignore and continue*/ }
+ }
+}
+
+
+/*
+SCALA
+
+package org.opendaylight.controller.sal.connector.remoterpc
+
+ import org.slf4j.{LoggerFactory, Logger}
+ import scala.collection.JavaConverters._
+ import scala.Some
+ import org.opendaylight.controller.sal.connector.remoterpc.dto.{MessageWrapper, Message}
+*/
+/**
+ * Main server thread for sending requests. This does not maintain any state. If the
+ * thread dies, it will be restarted
+ */
+/*class Sender extends Runnable {
+ private val _logger: Logger = LoggerFactory.getLogger(Sender.this.getClass())
+
+ override def run = {
+ _logger.info("Sender starting...")
+ val socketManager = new SocketManager()
+
+ try {
+ while (!Thread.currentThread().isInterrupted) {
+ //read incoming messages from blocking queue
+ val request: MessageWrapper = Client.requestQueue.poll()
+
+ if (request != null) {
+ if ((request.message != null) &&
+ (request.message.getRecipient != null)) {
+
+ val socket = socketManager.getManagedSocket(request.message.getRecipient)
+ socket.send(request)
+ } else {
+ //invalid message. log and drop
+ _logger.error("Invalid request [{}]", request)
+ }
+ }
+
+ socketManager.getManagedSockets().asScala.map(s => s.process)
+
+ // Poll all sockets for responses every 1 sec
+ poll(socketManager)
+
+ // If any sockets get a response, process it
+ for (i <- 0 until socketManager.poller.getSize) {
+ if (socketManager.poller.pollin(i)) {
+ val socket = socketManager.getManagedSocketFor(socketManager.poller.getItem(i).getSocket)
+
+ socket match {
+ case None => //{
+ _logger.error("Could not find a managed socket for zmq socket")
+ throw new IllegalStateException("Could not find a managed socket for zmq socket")
+ //}
+ case Some(s) => {
+ val response = s.receive()
+ _logger.debug("Received rpc response [{}]", response.message)
+ response.receiveSocket.send(Message.serialize(response.message))
+ }
+ }
+ }
+ }
+
+ }
+ } catch{
+ case e:Exception => {
+ _logger.debug("Sender stopping due to exception")
+ e.printStackTrace()
+ }
+ } finally {
+ socketManager.stop
+ }
+ }
+
+ def poll(socketManager:SocketManager) = {
+ try{
+ socketManager.poller.poll(10)
+ }catch{
+ case t:Throwable => //ignore and continue
+ }
+ }
+}
+
+
+// def newThread(r: Runnable): Thread = {
+// val t = new RequestHandler()
+// t.setUncaughtExceptionHandler(new RequestProcessorExceptionHandler)
+// t
+// }
+
+
+
+/**
+ * Restarts the request processing server in the event of unforeseen exceptions
+ */
+//private class RequestProcessorExceptionHandler extends UncaughtExceptionHandler {
+// def uncaughtException(t: Thread, e: Throwable) = {
+// _logger.error("Exception caught during request processing [{}]", e)
+// _logger.info("Restarting request processor server...")
+// RequestProcessor.start()
+// }