/* * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.sal.connector.remoterpc; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import org.opendaylight.controller.sal.connector.remoterpc.dto.Message; import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.zeromq.ZMQ; import java.io.IOException; import java.util.concurrent.TimeoutException; import static com.google.common.base.Preconditions.*; /** * Main server thread for sending requests. */ public class Sender implements Runnable{ private final static Logger _logger = LoggerFactory.getLogger(Sender.class); private final Client client; public Sender(Client client) { super(); this.client = client; } @Override public void run() { _logger.info("Starting..."); try (SocketManager socketManager = new SocketManager()){ while (!Thread.currentThread().isInterrupted()) { //read incoming messages from blocking queue MessageWrapper request = pollForRequest(); if (request != null) { processRequest(socketManager, request); } flushSockets(socketManager); pollForResponse(socketManager); processResponse(socketManager); } } catch(Exception t){ _logger.error("Exception: [{}]", t); _logger.error("Stopping..."); } } private void processResponse(SocketManager socketManager) { for (int i = 0; i < socketManager.getPoller().getSize(); i++) { // If any sockets get a response, process it if (socketManager.getPoller().pollin(i)) { Optional socket = socketManager.getManagedSocketFor( socketManager.getPoller().getItem(i).getSocket()); checkState(socket.isPresent(), "Managed socket not found"); MessageWrapper response = socket.get().receive(); _logger.debug("Received rpc response [{}]", response.getMessage()); //TODO: handle exception and introduce timeout on receiver side try { response.getReceiveSocket().send(Message.serialize(response.getMessage())); } catch (IOException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } } } } private void processRequest(SocketManager socketManager, MessageWrapper request) throws TimeoutException { if ((request.getMessage() == null) || (request.getMessage().getRecipient() == null)) { //invalid message. log and drop _logger.error("Invalid request [{}]", request); return; } RpcSocket socket = socketManager.getManagedSocket(request.getMessage().getRecipient()); socket.send(request); } private void flushSockets(SocketManager socketManager){ for (RpcSocket socket : socketManager.getManagedSockets()){ socket.process(); } } private MessageWrapper pollForRequest(){ return client.getRequestQueue().poll(); } private void pollForResponse(SocketManager socketManager){ try{ socketManager.getPoller().poll(10); //poll every 10ms }catch (Throwable t) { /*Ignore and continue*/ } } } /* SCALA package org.opendaylight.controller.sal.connector.remoterpc import org.slf4j.{LoggerFactory, Logger} import scala.collection.JavaConverters._ import scala.Some import org.opendaylight.controller.sal.connector.remoterpc.dto.{MessageWrapper, Message} */ /** * Main server thread for sending requests. This does not maintain any state. If the * thread dies, it will be restarted */ /*class Sender extends Runnable { private val _logger: Logger = LoggerFactory.getLogger(Sender.this.getClass()) override def run = { _logger.info("Sender starting...") val socketManager = new SocketManager() try { while (!Thread.currentThread().isInterrupted) { //read incoming messages from blocking queue val request: MessageWrapper = Client.requestQueue.poll() if (request != null) { if ((request.message != null) && (request.message.getRecipient != null)) { val socket = socketManager.getManagedSocket(request.message.getRecipient) socket.send(request) } else { //invalid message. log and drop _logger.error("Invalid request [{}]", request) } } socketManager.getManagedSockets().asScala.map(s => s.process) // Poll all sockets for responses every 1 sec poll(socketManager) // If any sockets get a response, process it for (i <- 0 until socketManager.poller.getSize) { if (socketManager.poller.pollin(i)) { val socket = socketManager.getManagedSocketFor(socketManager.poller.getItem(i).getSocket) socket match { case None => //{ _logger.error("Could not find a managed socket for zmq socket") throw new IllegalStateException("Could not find a managed socket for zmq socket") //} case Some(s) => { val response = s.receive() _logger.debug("Received rpc response [{}]", response.message) response.receiveSocket.send(Message.serialize(response.message)) } } } } } } catch{ case e:Exception => { _logger.debug("Sender stopping due to exception") e.printStackTrace() } } finally { socketManager.stop } } def poll(socketManager:SocketManager) = { try{ socketManager.poller.poll(10) }catch{ case t:Throwable => //ignore and continue } } } // def newThread(r: Runnable): Thread = { // val t = new RequestHandler() // t.setUncaughtExceptionHandler(new RequestProcessorExceptionHandler) // t // } /** * Restarts the request processing server in the event of unforeseen exceptions */ //private class RequestProcessorExceptionHandler extends UncaughtExceptionHandler { // def uncaughtException(t: Thread, e: Throwable) = { // _logger.error("Exception caught during request processing [{}]", e) // _logger.info("Restarting request processor server...") // RequestProcessor.start() // }