X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fimplementation%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fconnector%2Fremoterpc%2FServerRequestHandler.java;h=a25387d297935351bbec8d3f6b0af13638f43286;hp=949e6ee9a8fb2653da07a848b414c81160aea2ab;hb=3c0667dedcd089624c4de1fdf39f9c971bdce209;hpb=5bf8e609e5f7c2f69ea58a5d0e6d7a564457b2f1 diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerRequestHandler.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerRequestHandler.java index 949e6ee9a8..a25387d297 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerRequestHandler.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerRequestHandler.java @@ -8,6 +8,17 @@ package org.opendaylight.controller.sal.connector.remoterpc; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.opendaylight.controller.sal.connector.api.RpcRouter; import org.opendaylight.controller.sal.connector.remoterpc.dto.Message; import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils; import org.opendaylight.controller.sal.core.api.Broker; @@ -18,24 +29,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.zeromq.ZMQ; -import java.io.IOException; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; - /** * */ public class ServerRequestHandler implements AutoCloseable{ - private Logger _logger = LoggerFactory.getLogger(ServerRequestHandler.class); + private final Logger _logger = LoggerFactory.getLogger(ServerRequestHandler.class); private final String DEFAULT_NAME = "remote-rpc-worker"; - private String dealerAddress; - private String serverAddress; - private int workerCount; - private ZMQ.Context context; - private Broker.ProviderSession broker; + private final String dealerAddress; + private final String serverAddress; + private final int workerCount; + private final ZMQ.Context context; + private final Broker.ProviderSession broker; private RequestHandlerThreadPool workerPool; private final AtomicInteger threadId = new AtomicInteger(); @@ -83,7 +88,7 @@ public class ServerRequestHandler implements AutoCloseable{ * Worker to handles RPC request */ private class Worker implements Runnable { - private String name; + private final String name; public Worker(int id){ this.name = DEFAULT_NAME + "-" + id; @@ -101,11 +106,12 @@ public class ServerRequestHandler implements AutoCloseable{ while (!Thread.currentThread().isInterrupted()) { - Message request = parseMessage(socket); - _logger.debug("Received rpc request [{}]", request); + MessageHandler handler = new MessageHandler(socket); + handler.receiveMessage(); + + if (handler.hasMessageForBroker()) { - if (request != null) { - // Call broker to process the message then reply + Message request = handler.getMessage(); Future> rpc = null; RpcResult result = null; @@ -117,28 +123,14 @@ public class ServerRequestHandler implements AutoCloseable{ result = (rpc != null) ? rpc.get() : null; - } catch (Exception e) { - _logger.debug("Broker threw [{}]", e); - } - - CompositeNode payload = (result != null) ? result.getResult() : null; - - Message response = new Message.MessageBuilder() - .type(Message.MessageType.RESPONSE) - .sender(serverAddress) - .route(request.getRoute()) - .payload(XmlUtils.compositeNodeToXml(payload)) - .build(); - - _logger.debug("Sending rpc response [{}]", response); + handler.sendResponse(result); - try { - socket.send(Message.serialize(response)); } catch (Exception e) { - _logger.debug("rpc response send failed for message [{}]", response); - _logger.debug("{}", e); + _logger.debug("Broker threw [{}]", e); + handler.sendError(e.getMessage()); } } + } } catch (Exception e) { printException(e); @@ -147,16 +139,6 @@ public class ServerRequestHandler implements AutoCloseable{ } } - /** - * @param socket - * @return - */ - private Message parseMessage(ZMQ.Socket socket) throws Exception { - byte[] bytes = socket.recv(); //this blocks - _logger.debug("Received bytes:[{}]", bytes.length); - return (Message) Message.deserialize(bytes); - } - private void printException(Exception e) { try (StringWriter s = new StringWriter(); PrintWriter p = new PrintWriter(s)) { @@ -204,4 +186,107 @@ public class ServerRequestHandler implements AutoCloseable{ super.afterExecute(r, null); } } + + class MessageHandler{ + private final ZMQ.Socket socket; + private Message message; //parsed message received on zmq server port + private boolean messageForBroker = false; //if the message is valid and not a "ping" message + + public MessageHandler(ZMQ.Socket socket){ + this.socket = socket; + } + + void receiveMessage(){ + byte[] bytes = socket.recv(); //this blocks + _logger.debug("Received bytes:[{}]", bytes.length); + + Object objectRecvd = null; + try{ + objectRecvd = Message.deserialize(bytes); + }catch (Exception e){ + sendError(e.getMessage()); + return; + } + + if (!(objectRecvd instanceof Message)) { + sendError("Invalid message received"); + return; + } + + message = (Message) objectRecvd; + + _logger.info("Received request [{}]", message); + + if (Message.MessageType.PING == message.getType()){ + sendPong(); + return; + } + + messageForBroker = true; + } + + boolean hasMessageForBroker(){ + return messageForBroker; + } + + Message getMessage(){ + return message; + } + + void sendResponse(RpcResult result){ + CompositeNode payload = (result != null) ? result.getResult() : null; + + String recipient = null; + RpcRouter.RouteIdentifier routeId = null; + + if (message != null) { + recipient = message.getSender(); + routeId = message.getRoute(); + } + + Message response = new Message.MessageBuilder() + .type(Message.MessageType.RESPONSE) + .sender(serverAddress) + .recipient(recipient) + .route(routeId) + .payload(XmlUtils.compositeNodeToXml(payload)) + .build(); + + send(response); + } + + private void sendError(String msg){ + Message errorResponse = new Message.MessageBuilder() + .type(Message.MessageType.ERROR) + .sender(serverAddress) + .payload(msg) + .build(); + + send(errorResponse); + } + + private void sendPong(){ + Message pong = new Message.MessageBuilder() + .type(Message.MessageType.PONG) + .sender(serverAddress) + .build(); + + send(pong); + } + + private void send(Message msg){ + byte[] serializedMessage = null; + try { + serializedMessage = Message.serialize(msg); + } catch (Exception e) { + _logger.debug("Unexpected error during serialization of response [{}]", msg); + return; + } + + if (serializedMessage != null) + if (socket.send(serializedMessage)) + _logger.info("Response sent [{}]", msg); + else _logger.debug("Failed to send serialized message"); + } + } }