package org.opendaylight.controller.sal.connector.remoterpc;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
import org.opendaylight.controller.sal.core.api.Broker;
import org.slf4j.LoggerFactory;
import org.zeromq.ZMQ;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicInteger;
-
/**
*
*/
public class ServerRequestHandler implements AutoCloseable{
- private Logger _logger = LoggerFactory.getLogger(ServerRequestHandler.class);
+ private final Logger _logger = LoggerFactory.getLogger(ServerRequestHandler.class);
private final String DEFAULT_NAME = "remote-rpc-worker";
- private String dealerAddress;
- private String serverAddress;
- private int workerCount;
- private ZMQ.Context context;
- private Broker.ProviderSession broker;
+ private final String dealerAddress;
+ private final String serverAddress;
+ private final int workerCount;
+ private final ZMQ.Context context;
+ private final Broker.ProviderSession broker;
private RequestHandlerThreadPool workerPool;
private final AtomicInteger threadId = new AtomicInteger();
* Worker to handles RPC request
*/
private class Worker implements Runnable {
- private String name;
+ private final String name;
public Worker(int id){
this.name = DEFAULT_NAME + "-" + id;
while (!Thread.currentThread().isInterrupted()) {
- Message request = parseMessage(socket);
- _logger.debug("Received rpc request [{}]", request);
+ MessageHandler handler = new MessageHandler(socket);
+ handler.receiveMessage();
+
+ if (handler.hasMessageForBroker()) {
- if (request != null) {
- // Call broker to process the message then reply
+ Message request = handler.getMessage();
Future<RpcResult<CompositeNode>> rpc = null;
RpcResult<CompositeNode> result = null;
result = (rpc != null) ? rpc.get() : null;
- } catch (Exception e) {
- _logger.debug("Broker threw [{}]", e);
- }
-
- CompositeNode payload = (result != null) ? result.getResult() : null;
-
- Message response = new Message.MessageBuilder()
- .type(Message.MessageType.RESPONSE)
- .sender(serverAddress)
- .route(request.getRoute())
- .payload(XmlUtils.compositeNodeToXml(payload))
- .build();
-
- _logger.debug("Sending rpc response [{}]", response);
+ handler.sendResponse(result);
- try {
- socket.send(Message.serialize(response));
} catch (Exception e) {
- _logger.debug("rpc response send failed for message [{}]", response);
- _logger.debug("{}", e);
+ _logger.debug("Broker threw [{}]", e);
+ handler.sendError(e.getMessage());
}
}
+
}
} catch (Exception e) {
printException(e);
}
}
- /**
- * @param socket
- * @return
- */
- private Message parseMessage(ZMQ.Socket socket) throws Exception {
- byte[] bytes = socket.recv(); //this blocks
- _logger.debug("Received bytes:[{}]", bytes.length);
- return (Message) Message.deserialize(bytes);
- }
-
private void printException(Exception e) {
try (StringWriter s = new StringWriter();
PrintWriter p = new PrintWriter(s)) {
super.afterExecute(r, null);
}
}
+
+ class MessageHandler{
+ private final ZMQ.Socket socket;
+ private Message message; //parsed message received on zmq server port
+ private boolean messageForBroker = false; //if the message is valid and not a "ping" message
+
+ public MessageHandler(ZMQ.Socket socket){
+ this.socket = socket;
+ }
+
+ void receiveMessage(){
+ byte[] bytes = socket.recv(); //this blocks
+ _logger.debug("Received bytes:[{}]", bytes.length);
+
+ Object objectRecvd = null;
+ try{
+ objectRecvd = Message.deserialize(bytes);
+ }catch (Exception e){
+ sendError(e.getMessage());
+ return;
+ }
+
+ if (!(objectRecvd instanceof Message)) {
+ sendError("Invalid message received");
+ return;
+ }
+
+ message = (Message) objectRecvd;
+
+ _logger.info("Received request [{}]", message);
+
+ if (Message.MessageType.PING == message.getType()){
+ sendPong();
+ return;
+ }
+
+ messageForBroker = true;
+ }
+
+ boolean hasMessageForBroker(){
+ return messageForBroker;
+ }
+
+ Message getMessage(){
+ return message;
+ }
+
+ void sendResponse(RpcResult<CompositeNode> result){
+ CompositeNode payload = (result != null) ? result.getResult() : null;
+
+ String recipient = null;
+ RpcRouter.RouteIdentifier<?, ?, ?> routeId = null;
+
+ if (message != null) {
+ recipient = message.getSender();
+ routeId = message.getRoute();
+ }
+
+ Message response = new Message.MessageBuilder()
+ .type(Message.MessageType.RESPONSE)
+ .sender(serverAddress)
+ .recipient(recipient)
+ .route(routeId)
+ .payload(XmlUtils.compositeNodeToXml(payload))
+ .build();
+
+ send(response);
+ }
+
+ private void sendError(String msg){
+ Message errorResponse = new Message.MessageBuilder()
+ .type(Message.MessageType.ERROR)
+ .sender(serverAddress)
+ .payload(msg)
+ .build();
+
+ send(errorResponse);
+ }
+
+ private void sendPong(){
+ Message pong = new Message.MessageBuilder()
+ .type(Message.MessageType.PONG)
+ .sender(serverAddress)
+ .build();
+
+ send(pong);
+ }
+
+ private void send(Message msg){
+ byte[] serializedMessage = null;
+ try {
+ serializedMessage = Message.serialize(msg);
+ } catch (Exception e) {
+ _logger.debug("Unexpected error during serialization of response [{}]", msg);
+ return;
+ }
+
+ if (serializedMessage != null)
+ if (socket.send(serializedMessage))
+ _logger.info("Response sent [{}]", msg);
+ else _logger.debug("Failed to send serialized message");
+ }
+ }
}