X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fimplementation%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fconnector%2Fremoterpc%2FServerRequestHandler.java;h=2041f03afba07c58146c1c2d449d3e1f5d2ccc3d;hb=66249d6ccc367fad055a269f561860d2d96af385;hp=949e6ee9a8fb2653da07a848b414c81160aea2ab;hpb=054306c1c3b582b4448db31c2058c7282246a258;p=controller.git diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerRequestHandler.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerRequestHandler.java index 949e6ee9a8..2041f03afb 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerRequestHandler.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerRequestHandler.java @@ -8,10 +8,13 @@ package org.opendaylight.controller.sal.connector.remoterpc; +import org.opendaylight.controller.sal.connector.api.RpcRouter; import org.opendaylight.controller.sal.connector.remoterpc.dto.Message; +import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl; import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils; import org.opendaylight.controller.sal.core.api.Broker; import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.CompositeNode; import org.slf4j.Logger; @@ -21,6 +24,7 @@ import org.zeromq.ZMQ; import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; +import java.util.Collection; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; @@ -101,11 +105,12 @@ public class ServerRequestHandler implements AutoCloseable{ while (!Thread.currentThread().isInterrupted()) { - Message request = parseMessage(socket); - _logger.debug("Received rpc request [{}]", request); + MessageHandler handler = new MessageHandler(socket); + handler.receiveMessage(); - if (request != null) { - // Call broker to process the message then reply + if (handler.hasMessageForBroker()) { + + Message request = handler.getMessage(); Future> rpc = null; RpcResult result = null; @@ -117,28 +122,14 @@ public class ServerRequestHandler implements AutoCloseable{ result = (rpc != null) ? rpc.get() : null; - } catch (Exception e) { - _logger.debug("Broker threw [{}]", e); - } - - CompositeNode payload = (result != null) ? result.getResult() : null; - - Message response = new Message.MessageBuilder() - .type(Message.MessageType.RESPONSE) - .sender(serverAddress) - .route(request.getRoute()) - .payload(XmlUtils.compositeNodeToXml(payload)) - .build(); + handler.sendResponse(result); - _logger.debug("Sending rpc response [{}]", response); - - try { - socket.send(Message.serialize(response)); } catch (Exception e) { - _logger.debug("rpc response send failed for message [{}]", response); - _logger.debug("{}", e); + _logger.debug("Broker threw [{}]", e); + handler.sendError(e.getMessage()); } } + } } catch (Exception e) { printException(e); @@ -147,16 +138,6 @@ public class ServerRequestHandler implements AutoCloseable{ } } - /** - * @param socket - * @return - */ - private Message parseMessage(ZMQ.Socket socket) throws Exception { - byte[] bytes = socket.recv(); //this blocks - _logger.debug("Received bytes:[{}]", bytes.length); - return (Message) Message.deserialize(bytes); - } - private void printException(Exception e) { try (StringWriter s = new StringWriter(); PrintWriter p = new PrintWriter(s)) { @@ -204,4 +185,107 @@ public class ServerRequestHandler implements AutoCloseable{ super.afterExecute(r, null); } } + + class MessageHandler{ + private ZMQ.Socket socket; + private Message message; //parsed message received on zmq server port + private boolean messageForBroker = false; //if the message is valid and not a "ping" message + + public MessageHandler(ZMQ.Socket socket){ + this.socket = socket; + } + + void receiveMessage(){ + byte[] bytes = socket.recv(); //this blocks + _logger.debug("Received bytes:[{}]", bytes.length); + + Object objectRecvd = null; + try{ + objectRecvd = Message.deserialize(bytes); + }catch (Exception e){ + sendError(e.getMessage()); + return; + } + + if (!(objectRecvd instanceof Message)) { + sendError("Invalid message received"); + return; + } + + message = (Message) objectRecvd; + + _logger.info("Received request [{}]", message); + + if (Message.MessageType.PING == message.getType()){ + sendPong(); + return; + } + + messageForBroker = true; + } + + boolean hasMessageForBroker(){ + return messageForBroker; + } + + Message getMessage(){ + return message; + } + + void sendResponse(RpcResult result){ + CompositeNode payload = (result != null) ? result.getResult() : null; + + String recipient = null; + RpcRouter.RouteIdentifier routeId = null; + + if (message != null) { + recipient = message.getSender(); + routeId = message.getRoute(); + } + + Message response = new Message.MessageBuilder() + .type(Message.MessageType.RESPONSE) + .sender(serverAddress) + .recipient(recipient) + .route(routeId) + .payload(XmlUtils.compositeNodeToXml(payload)) + .build(); + + send(response); + } + + private void sendError(String msg){ + Message errorResponse = new Message.MessageBuilder() + .type(Message.MessageType.ERROR) + .sender(serverAddress) + .payload(msg) + .build(); + + send(errorResponse); + } + + private void sendPong(){ + Message pong = new Message.MessageBuilder() + .type(Message.MessageType.PONG) + .sender(serverAddress) + .build(); + + send(pong); + } + + private void send(Message msg){ + byte[] serializedMessage = null; + try { + serializedMessage = Message.serialize(msg); + } catch (Exception e) { + _logger.debug("Unexpected error during serialization of response [{}]", msg); + return; + } + + if (serializedMessage != null) + if (socket.send(serializedMessage)) + _logger.info("Response sent [{}]", msg); + else _logger.debug("Failed to send serialized message"); + } + } }