2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.connector.remoterpc;
10 import com.google.common.base.Optional;
11 import com.google.common.base.Preconditions;
12 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
13 import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper;
14 import org.slf4j.Logger;
15 import org.slf4j.LoggerFactory;
16 import org.zeromq.ZMQ;
18 import java.io.IOException;
19 import java.util.concurrent.TimeoutException;
21 import static com.google.common.base.Preconditions.*;
24 * Main server thread for sending requests.
26 public class Sender implements Runnable{
28 private final static Logger _logger = LoggerFactory.getLogger(Sender.class);
29 private final Client client;
34 public Sender(Client client) {
41 _logger.info("Starting...");
43 try (SocketManager socketManager = new SocketManager()){
44 while (!Thread.currentThread().isInterrupted()) {
46 //read incoming messages from blocking queue
47 MessageWrapper request = pollForRequest();
49 if (request != null) {
50 processRequest(socketManager, request);
53 flushSockets(socketManager);
54 pollForResponse(socketManager);
55 processResponse(socketManager);
59 _logger.error("Exception: [{}]", t);
60 _logger.error("Stopping...");
64 private void processResponse(SocketManager socketManager) {
65 for (int i = 0; i < socketManager.getPoller().getSize(); i++) {
66 // If any sockets get a response, process it
67 if (socketManager.getPoller().pollin(i)) {
68 Optional<RpcSocket> socket = socketManager.getManagedSocketFor(
69 socketManager.getPoller().getItem(i).getSocket());
71 checkState(socket.isPresent(), "Managed socket not found");
73 MessageWrapper response = socket.get().receive();
74 _logger.debug("Received rpc response [{}]", response.getMessage());
76 //TODO: handle exception and introduce timeout on receiver side
78 response.getReceiveSocket().send(Message.serialize(response.getMessage()));
79 } catch (IOException e) {
80 e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
86 private void processRequest(SocketManager socketManager, MessageWrapper request) throws TimeoutException {
88 if ((request.getMessage() == null) ||
89 (request.getMessage().getRecipient() == null)) {
90 //invalid message. log and drop
91 _logger.error("Invalid request [{}]", request);
96 socketManager.getManagedSocket(request.getMessage().getRecipient());
101 private void flushSockets(SocketManager socketManager){
102 for (RpcSocket socket : socketManager.getManagedSockets()){
107 private MessageWrapper pollForRequest(){
108 return client.getRequestQueue().poll();
111 private void pollForResponse(SocketManager socketManager){
113 socketManager.getPoller().poll(10); //poll every 10ms
114 }catch (Throwable t) { /*Ignore and continue*/ }
122 package org.opendaylight.controller.sal.connector.remoterpc
124 import org.slf4j.{LoggerFactory, Logger}
125 import scala.collection.JavaConverters._
127 import org.opendaylight.controller.sal.connector.remoterpc.dto.{MessageWrapper, Message}
130 * Main server thread for sending requests. This does not maintain any state. If the
131 * thread dies, it will be restarted
133 /*class Sender extends Runnable {
134 private val _logger: Logger = LoggerFactory.getLogger(Sender.this.getClass())
137 _logger.info("Sender starting...")
138 val socketManager = new SocketManager()
141 while (!Thread.currentThread().isInterrupted) {
142 //read incoming messages from blocking queue
143 val request: MessageWrapper = Client.requestQueue.poll()
145 if (request != null) {
146 if ((request.message != null) &&
147 (request.message.getRecipient != null)) {
149 val socket = socketManager.getManagedSocket(request.message.getRecipient)
152 //invalid message. log and drop
153 _logger.error("Invalid request [{}]", request)
157 socketManager.getManagedSockets().asScala.map(s => s.process)
159 // Poll all sockets for responses every 1 sec
162 // If any sockets get a response, process it
163 for (i <- 0 until socketManager.poller.getSize) {
164 if (socketManager.poller.pollin(i)) {
165 val socket = socketManager.getManagedSocketFor(socketManager.poller.getItem(i).getSocket)
169 _logger.error("Could not find a managed socket for zmq socket")
170 throw new IllegalStateException("Could not find a managed socket for zmq socket")
173 val response = s.receive()
174 _logger.debug("Received rpc response [{}]", response.message)
175 response.receiveSocket.send(Message.serialize(response.message))
183 case e:Exception => {
184 _logger.debug("Sender stopping due to exception")
192 def poll(socketManager:SocketManager) = {
194 socketManager.poller.poll(10)
196 case t:Throwable => //ignore and continue
202 // def newThread(r: Runnable): Thread = {
203 // val t = new RequestHandler()
204 // t.setUncaughtExceptionHandler(new RequestProcessorExceptionHandler)
211 * Restarts the request processing server in the event of unforeseen exceptions
213 //private class RequestProcessorExceptionHandler extends UncaughtExceptionHandler {
214 // def uncaughtException(t: Thread, e: Throwable) = {
215 // _logger.error("Exception caught during request processing [{}]", e)
216 // _logger.info("Restarting request processor server...")
217 // RequestProcessor.start()