From 8ec80cfe8201adca88813d83a007417133da162e Mon Sep 17 00:00:00 2001 From: Abhishek Kumar Date: Fri, 14 Feb 2014 14:39:35 -0800 Subject: [PATCH] Enhanced message handling -Added support for ping/pong messages -Improved logging -Refactored code, added MessageHandler for better handling of message -Updated dependency version in pom Change-Id: I04ff607c287afbe8a2ea156a0f8cd9029c1504ab Signed-off-by: Abhishek Kumar --- .../sal/connector/remoterpc/ClientImpl.java | 25 ++- .../remoterpc/ClientRequestHandler.java | 12 +- .../remoterpc/ServerRequestHandler.java | 148 ++++++++++++++---- .../sal/connector/remoterpc/dto/Message.java | 5 +- .../remoterpc/dto/RouteIdentifierImpl.java | 16 +- .../connector/remoterpc/SerilizationTest.java | 5 +- .../remoterpc/utils/MessagingUtil.java | 35 +++++ .../utils/RemoteServerTestClient.java | 89 +++++++++++ .../src/test/resources/AddFlow.xml | 36 +++++ 9 files changed, 323 insertions(+), 48 deletions(-) create mode 100644 opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/utils/RemoteServerTestClient.java create mode 100644 opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/resources/AddFlow.xml diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImpl.java index 30e11c0806..84df2e43f0 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImpl.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImpl.java @@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory; import org.zeromq.ZMQ; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -140,9 +141,29 @@ public class ClientImpl implements RemoteRpcClient { Message response = handler.handle(request); CompositeNode payload = null; - if ( response != null ) - payload = XmlUtils.xmlToCompositeNode((String) response.getPayload()); + if ( response != null ) { + _logger.info("Received response [{}]", response); + + Object rawPayload = response.getPayload(); + switch (response.getType()) { + case ERROR: + if ( rawPayload instanceof List ) + errors = (List) rawPayload; + break; + + case RESPONSE: + payload = XmlUtils.xmlToCompositeNode((String) rawPayload); + break; + + default: + errors.add( + RpcErrors.getRpcError(null, null,null,null,"Unable to get response from remote controller", null, null) + ); + break; + + } + } return Rpcs.getRpcResult(true, payload, errors); } catch (Exception e){ diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientRequestHandler.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientRequestHandler.java index f3ef4b6cae..fe70fb77be 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientRequestHandler.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientRequestHandler.java @@ -90,6 +90,7 @@ class ClientRequestHandler implements AutoCloseable{ //otherwise first create the bridge and then send request if ( connectedServers.containsKey(remoteServerAddress) ) return sendMessage(request, remoteServerAddress); + else{ workerPool.execute(new Worker(remoteServerAddress)); connectedServers.put(remoteServerAddress, remoteServerAddress); @@ -105,12 +106,15 @@ class ClientRequestHandler implements AutoCloseable{ ZMQ.Socket socket = context.socket(ZMQ.REQ); try { - socket.connect( INPROC_PROTOCOL_PREFIX + address); + String inProcessSocketAddress = INPROC_PROTOCOL_PREFIX + address; + socket.connect( inProcessSocketAddress ); + _logger.debug("Sending request [{}]", request); socket.send(Message.serialize(request)); - _logger.debug("Request sent. Waiting for reply..."); + _logger.info("Request sent. Waiting for reply..."); byte[] reply = socket.recv(0); - _logger.debug("Response received"); + _logger.info("Response received"); response = (Message) Message.deserialize(reply); + _logger.debug("Response [{}]", response); } finally { socket.close(); } @@ -143,7 +147,7 @@ class ClientRequestHandler implements AutoCloseable{ */ private class Worker implements Runnable { private String name; - private String remoteServer; // + private String remoteServer; // public Worker(String address){ this.name = DEFAULT_NAME + "[" + address + "]"; diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerRequestHandler.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerRequestHandler.java index 949e6ee9a8..2041f03afb 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerRequestHandler.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerRequestHandler.java @@ -8,10 +8,13 @@ package org.opendaylight.controller.sal.connector.remoterpc; +import org.opendaylight.controller.sal.connector.api.RpcRouter; import org.opendaylight.controller.sal.connector.remoterpc.dto.Message; +import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl; import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils; import org.opendaylight.controller.sal.core.api.Broker; import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.CompositeNode; import org.slf4j.Logger; @@ -21,6 +24,7 @@ import org.zeromq.ZMQ; import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; +import java.util.Collection; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; @@ -101,11 +105,12 @@ public class ServerRequestHandler implements AutoCloseable{ while (!Thread.currentThread().isInterrupted()) { - Message request = parseMessage(socket); - _logger.debug("Received rpc request [{}]", request); + MessageHandler handler = new MessageHandler(socket); + handler.receiveMessage(); - if (request != null) { - // Call broker to process the message then reply + if (handler.hasMessageForBroker()) { + + Message request = handler.getMessage(); Future> rpc = null; RpcResult result = null; @@ -117,28 +122,14 @@ public class ServerRequestHandler implements AutoCloseable{ result = (rpc != null) ? rpc.get() : null; - } catch (Exception e) { - _logger.debug("Broker threw [{}]", e); - } - - CompositeNode payload = (result != null) ? result.getResult() : null; - - Message response = new Message.MessageBuilder() - .type(Message.MessageType.RESPONSE) - .sender(serverAddress) - .route(request.getRoute()) - .payload(XmlUtils.compositeNodeToXml(payload)) - .build(); + handler.sendResponse(result); - _logger.debug("Sending rpc response [{}]", response); - - try { - socket.send(Message.serialize(response)); } catch (Exception e) { - _logger.debug("rpc response send failed for message [{}]", response); - _logger.debug("{}", e); + _logger.debug("Broker threw [{}]", e); + handler.sendError(e.getMessage()); } } + } } catch (Exception e) { printException(e); @@ -147,16 +138,6 @@ public class ServerRequestHandler implements AutoCloseable{ } } - /** - * @param socket - * @return - */ - private Message parseMessage(ZMQ.Socket socket) throws Exception { - byte[] bytes = socket.recv(); //this blocks - _logger.debug("Received bytes:[{}]", bytes.length); - return (Message) Message.deserialize(bytes); - } - private void printException(Exception e) { try (StringWriter s = new StringWriter(); PrintWriter p = new PrintWriter(s)) { @@ -204,4 +185,107 @@ public class ServerRequestHandler implements AutoCloseable{ super.afterExecute(r, null); } } + + class MessageHandler{ + private ZMQ.Socket socket; + private Message message; //parsed message received on zmq server port + private boolean messageForBroker = false; //if the message is valid and not a "ping" message + + public MessageHandler(ZMQ.Socket socket){ + this.socket = socket; + } + + void receiveMessage(){ + byte[] bytes = socket.recv(); //this blocks + _logger.debug("Received bytes:[{}]", bytes.length); + + Object objectRecvd = null; + try{ + objectRecvd = Message.deserialize(bytes); + }catch (Exception e){ + sendError(e.getMessage()); + return; + } + + if (!(objectRecvd instanceof Message)) { + sendError("Invalid message received"); + return; + } + + message = (Message) objectRecvd; + + _logger.info("Received request [{}]", message); + + if (Message.MessageType.PING == message.getType()){ + sendPong(); + return; + } + + messageForBroker = true; + } + + boolean hasMessageForBroker(){ + return messageForBroker; + } + + Message getMessage(){ + return message; + } + + void sendResponse(RpcResult result){ + CompositeNode payload = (result != null) ? result.getResult() : null; + + String recipient = null; + RpcRouter.RouteIdentifier routeId = null; + + if (message != null) { + recipient = message.getSender(); + routeId = message.getRoute(); + } + + Message response = new Message.MessageBuilder() + .type(Message.MessageType.RESPONSE) + .sender(serverAddress) + .recipient(recipient) + .route(routeId) + .payload(XmlUtils.compositeNodeToXml(payload)) + .build(); + + send(response); + } + + private void sendError(String msg){ + Message errorResponse = new Message.MessageBuilder() + .type(Message.MessageType.ERROR) + .sender(serverAddress) + .payload(msg) + .build(); + + send(errorResponse); + } + + private void sendPong(){ + Message pong = new Message.MessageBuilder() + .type(Message.MessageType.PONG) + .sender(serverAddress) + .build(); + + send(pong); + } + + private void send(Message msg){ + byte[] serializedMessage = null; + try { + serializedMessage = Message.serialize(msg); + } catch (Exception e) { + _logger.debug("Unexpected error during serialization of response [{}]", msg); + return; + } + + if (serializedMessage != null) + if (socket.send(serializedMessage)) + _logger.info("Response sent [{}]", msg); + else _logger.debug("Failed to send serialized message"); + } + } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/Message.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/Message.java index 95fe99c81c..519791a195 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/Message.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/Message.java @@ -15,8 +15,8 @@ import java.io.*; public class Message implements Serializable { public static enum MessageType { - ANNOUNCE((byte) 0), //TODO: Remove announce, add rpc registration and deregistration - HEARTBEAT((byte) 1), + PING((byte) 0), + PONG((byte) 1), REQUEST((byte) 2), RESPONSE((byte) 3), ERROR((byte)4); @@ -77,6 +77,7 @@ public class Message implements Serializable { public void setRecipient(String recipient) { this.recipient = recipient; } + @Override public String toString() { return "Message{" + diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/RouteIdentifierImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/RouteIdentifierImpl.java index 0b26727155..4ffcf3e099 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/RouteIdentifierImpl.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/RouteIdentifierImpl.java @@ -7,15 +7,12 @@ */ package org.opendaylight.controller.sal.connector.remoterpc.dto; -import java.io.Serializable; -import java.net.URI; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import org.opendaylight.controller.sal.connector.api.RpcRouter; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; +import java.io.Serializable; + public class RouteIdentifierImpl implements RpcRouter.RouteIdentifier,Serializable { private QName context; @@ -83,4 +80,13 @@ public class RouteIdentifierImpl implements RpcRouter.RouteIdentifier" + + "5" + + "Foo" + + "" + + "" + + "" + + "2048" + + "" + + "" + + "10.0.10.2/24" + + "" + + "" + flowId + "" + + "" + tableId + "" + + "" + + "" + + "0" + + "" + + "" + + "0" + + "" + + "" + + "" + + "" + + "" + + ""; + + return XmlUtils.xmlToCompositeNode(xml); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/resources/AddFlow.xml b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/resources/AddFlow.xml new file mode 100644 index 0000000000..b042b8f65b --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/resources/AddFlow.xml @@ -0,0 +1,36 @@ + + + BA-7 + 4 + 5 + + /(urn:opendaylight:inventory?revision=2013-08-19)nodes/(urn:opendaylight:inventory?revision=2013-08-19)node[{(urn:opendaylight:inventory?revision=2013-08-19)id=openflow:1}] + + + 10.0.10.2/24 + + + 2048 + + + + + + 0 + + + 0 + + + + + + + /(urn:opendaylight:inventory?revision=2013-08-19)nodes/(urn:opendaylight:inventory?revision=2013-08-19)node[{(urn:opendaylight:inventory?revision=2013-08-19)id=openflow:1}]/(urn:opendaylight:flow:inventory?revision=2013-08-19)table[{(urn:opendaylight:flow:inventory?revision=2013-08-19)id=4}] + + + /(urn:opendaylight:inventory?revision=2013-08-19)nodes/(urn:opendaylight:inventory?revision=2013-08-19)node[{(urn:opendaylight:inventory?revision=2013-08-19)id=openflow:1}]/(urn:opendaylight:flow:inventory?revision=2013-08-19)table[{(urn:opendaylight:flow:inventory?revision=2013-08-19)id=4}]/(urn:opendaylight:flow:inventory?revision=2013-08-19)flow[{(urn:opendaylight:flow:inventory?revision=2013-08-19)id=4}] + + Foo + + \ No newline at end of file -- 2.36.6