import org.zeromq.ZMQ;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Message response = handler.handle(request);
CompositeNode payload = null;
- if ( response != null )
- payload = XmlUtils.xmlToCompositeNode((String) response.getPayload());
+ if ( response != null ) {
+ _logger.info("Received response [{}]", response);
+
+ Object rawPayload = response.getPayload();
+ switch (response.getType()) {
+ case ERROR:
+ if ( rawPayload instanceof List )
+ errors = (List) rawPayload;
+ break;
+
+ case RESPONSE:
+ payload = XmlUtils.xmlToCompositeNode((String) rawPayload);
+ break;
+
+ default:
+ errors.add(
+ RpcErrors.getRpcError(null, null,null,null,"Unable to get response from remote controller", null, null)
+ );
+ break;
+
+ }
+ }
return Rpcs.getRpcResult(true, payload, errors);
} catch (Exception e){
//otherwise first create the bridge and then send request
if ( connectedServers.containsKey(remoteServerAddress) )
return sendMessage(request, remoteServerAddress);
+
else{
workerPool.execute(new Worker(remoteServerAddress));
connectedServers.put(remoteServerAddress, remoteServerAddress);
ZMQ.Socket socket = context.socket(ZMQ.REQ);
try {
- socket.connect( INPROC_PROTOCOL_PREFIX + address);
+ String inProcessSocketAddress = INPROC_PROTOCOL_PREFIX + address;
+ socket.connect( inProcessSocketAddress );
+ _logger.debug("Sending request [{}]", request);
socket.send(Message.serialize(request));
- _logger.debug("Request sent. Waiting for reply...");
+ _logger.info("Request sent. Waiting for reply...");
byte[] reply = socket.recv(0);
- _logger.debug("Response received");
+ _logger.info("Response received");
response = (Message) Message.deserialize(reply);
+ _logger.debug("Response [{}]", response);
} finally {
socket.close();
}
*/
private class Worker implements Runnable {
private String name;
- private String remoteServer; //<servername:rpc-port>
+ private String remoteServer; //<serverip:rpc-port>
public Worker(String address){
this.name = DEFAULT_NAME + "[" + address + "]";
package org.opendaylight.controller.sal.connector.remoterpc;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
import org.opendaylight.controller.sal.core.api.Broker;
import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
import org.slf4j.Logger;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
+import java.util.Collection;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
while (!Thread.currentThread().isInterrupted()) {
- Message request = parseMessage(socket);
- _logger.debug("Received rpc request [{}]", request);
+ MessageHandler handler = new MessageHandler(socket);
+ handler.receiveMessage();
- if (request != null) {
- // Call broker to process the message then reply
+ if (handler.hasMessageForBroker()) {
+
+ Message request = handler.getMessage();
Future<RpcResult<CompositeNode>> rpc = null;
RpcResult<CompositeNode> result = null;
result = (rpc != null) ? rpc.get() : null;
- } catch (Exception e) {
- _logger.debug("Broker threw [{}]", e);
- }
-
- CompositeNode payload = (result != null) ? result.getResult() : null;
-
- Message response = new Message.MessageBuilder()
- .type(Message.MessageType.RESPONSE)
- .sender(serverAddress)
- .route(request.getRoute())
- .payload(XmlUtils.compositeNodeToXml(payload))
- .build();
+ handler.sendResponse(result);
- _logger.debug("Sending rpc response [{}]", response);
-
- try {
- socket.send(Message.serialize(response));
} catch (Exception e) {
- _logger.debug("rpc response send failed for message [{}]", response);
- _logger.debug("{}", e);
+ _logger.debug("Broker threw [{}]", e);
+ handler.sendError(e.getMessage());
}
}
+
}
} catch (Exception e) {
printException(e);
}
}
- /**
- * @param socket
- * @return
- */
- private Message parseMessage(ZMQ.Socket socket) throws Exception {
- byte[] bytes = socket.recv(); //this blocks
- _logger.debug("Received bytes:[{}]", bytes.length);
- return (Message) Message.deserialize(bytes);
- }
-
private void printException(Exception e) {
try (StringWriter s = new StringWriter();
PrintWriter p = new PrintWriter(s)) {
super.afterExecute(r, null);
}
}
+
+ class MessageHandler{
+ private ZMQ.Socket socket;
+ private Message message; //parsed message received on zmq server port
+ private boolean messageForBroker = false; //if the message is valid and not a "ping" message
+
+ public MessageHandler(ZMQ.Socket socket){
+ this.socket = socket;
+ }
+
+ void receiveMessage(){
+ byte[] bytes = socket.recv(); //this blocks
+ _logger.debug("Received bytes:[{}]", bytes.length);
+
+ Object objectRecvd = null;
+ try{
+ objectRecvd = Message.deserialize(bytes);
+ }catch (Exception e){
+ sendError(e.getMessage());
+ return;
+ }
+
+ if (!(objectRecvd instanceof Message)) {
+ sendError("Invalid message received");
+ return;
+ }
+
+ message = (Message) objectRecvd;
+
+ _logger.info("Received request [{}]", message);
+
+ if (Message.MessageType.PING == message.getType()){
+ sendPong();
+ return;
+ }
+
+ messageForBroker = true;
+ }
+
+ boolean hasMessageForBroker(){
+ return messageForBroker;
+ }
+
+ Message getMessage(){
+ return message;
+ }
+
+ void sendResponse(RpcResult<CompositeNode> result){
+ CompositeNode payload = (result != null) ? result.getResult() : null;
+
+ String recipient = null;
+ RpcRouter.RouteIdentifier routeId = null;
+
+ if (message != null) {
+ recipient = message.getSender();
+ routeId = message.getRoute();
+ }
+
+ Message response = new Message.MessageBuilder()
+ .type(Message.MessageType.RESPONSE)
+ .sender(serverAddress)
+ .recipient(recipient)
+ .route(routeId)
+ .payload(XmlUtils.compositeNodeToXml(payload))
+ .build();
+
+ send(response);
+ }
+
+ private void sendError(String msg){
+ Message errorResponse = new Message.MessageBuilder()
+ .type(Message.MessageType.ERROR)
+ .sender(serverAddress)
+ .payload(msg)
+ .build();
+
+ send(errorResponse);
+ }
+
+ private void sendPong(){
+ Message pong = new Message.MessageBuilder()
+ .type(Message.MessageType.PONG)
+ .sender(serverAddress)
+ .build();
+
+ send(pong);
+ }
+
+ private void send(Message msg){
+ byte[] serializedMessage = null;
+ try {
+ serializedMessage = Message.serialize(msg);
+ } catch (Exception e) {
+ _logger.debug("Unexpected error during serialization of response [{}]", msg);
+ return;
+ }
+
+ if (serializedMessage != null)
+ if (socket.send(serializedMessage))
+ _logger.info("Response sent [{}]", msg);
+ else _logger.debug("Failed to send serialized message");
+ }
+ }
}
public class Message implements Serializable {
public static enum MessageType {
- ANNOUNCE((byte) 0), //TODO: Remove announce, add rpc registration and deregistration
- HEARTBEAT((byte) 1),
+ PING((byte) 0),
+ PONG((byte) 1),
REQUEST((byte) 2),
RESPONSE((byte) 3),
ERROR((byte)4);
public void setRecipient(String recipient) {
this.recipient = recipient;
}
+
@Override
public String toString() {
return "Message{" +
*/
package org.opendaylight.controller.sal.connector.remoterpc.dto;
-import java.io.Serializable;
-import java.net.URI;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import java.io.Serializable;
+
public class RouteIdentifierImpl implements RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>,Serializable {
private QName context;
result = prime * result + (route == null ? 0:route.hashCode());
return result;
}
+
+ @Override
+ public String toString() {
+ return "RouteIdentifierImpl{" +
+ "context=" + context +
+ ", type=" + type +
+ ", route=" + route +
+ '}';
+ }
}
@Test
public void toXml() throws FileNotFoundException {
- InputStream xmlStream = SerilizationTest.class.getResourceAsStream("/FourSimpleChildren.xml");
+ //InputStream xmlStream = SerilizationTest.class.getResourceAsStream("/FourSimpleChildren.xml");
+ InputStream xmlStream = SerilizationTest.class.getResourceAsStream("/AddFlow.xml");
StringWriter writer = new StringWriter();
CompositeNode data = loadCompositeNode(xmlStream);
_logger.info("Parsed xml [{}]", writer.toString());
}
- //Note to self: Stolen from TestUtils
- ///Users/alefan/odl/controller4/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/TestUtils.java
// Figure out how to include TestUtils through pom ...was getting errors
private CompositeNode loadCompositeNode(InputStream xmlInputStream) throws FileNotFoundException {
if (xmlInputStream == null) {
};
}
+ public static Runnable sendAMessage(final ZMQ.Context context, final String serverAddress, final Message msg)
+ throws IOException, ClassNotFoundException, InterruptedException {
+
+ return new Runnable() {
+ @Override
+ public void run() {
+ final ZMQ.Socket socket = context.socket(ZMQ.REQ);
+ try {
+
+ socket.connect(serverAddress);
+ System.out.println(Thread.currentThread().getName() + " Sending message");
+ try {
+ socket.send(Message.serialize(msg));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ byte[] bytes = socket.recv();
+ Message response = null;
+ try {
+ response = (Message) Message.deserialize(bytes);
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+ System.out.println(Thread.currentThread().getName() + " Got response " + response);
+ } catch (Exception x) {
+ x.printStackTrace();
+ } finally {
+ socket.close();
+ }
+ }
+ };
+ }
+
public static Runnable sendAnEmptyMessage(final ZMQ.Context context, final String serverAddress)
throws IOException, ClassNotFoundException, InterruptedException {
--- /dev/null
+package org.opendaylight.controller.sal.connector.remoterpc.utils;
+
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.zeromq.ZMQ;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class RemoteServerTestClient {
+
+
+
+ public static void main(String args[]) throws Exception{
+ String serverAddress = "tcp://10.195.128.108:5666";
+ ZMQ.Context ctx = ZMQ.context(1);
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ RemoteServerTestClient client = new RemoteServerTestClient();
+ executor.execute(
+ MessagingUtil.sendAMessage(ctx, serverAddress, client.createPingMessage(serverAddress))
+ );
+ MessagingUtil.sendAMessage(ctx, serverAddress, client.createPingMessage(serverAddress));
+
+ Thread.sleep(5000);
+ MessagingUtil.closeZmqContext(ctx);
+ executor.shutdown();
+ }
+
+ public Message createPingMessage(String serverAddress){
+ Message ping = new Message.MessageBuilder()
+ .type(Message.MessageType.PING)
+ .sender("localhost:5444")
+ .recipient(serverAddress)
+ .build();
+
+ return ping;
+ }
+ public Message createAddFlowMessage(String serverAddress ){
+
+ RpcRouter.RouteIdentifier routeIdentifier = getAddFlowRpcIdentifier();
+
+ Message addFlow = new Message.MessageBuilder()
+ .type(Message.MessageType.REQUEST)
+ .sender("localhost:5444")
+ .recipient(serverAddress)
+ .route(routeIdentifier)
+ .payload(getAddFlowPayload(1,1))
+ .build();
+
+ return addFlow;
+ }
+
+ private RpcRouter.RouteIdentifier getAddFlowRpcIdentifier(){
+ throw new UnsupportedOperationException();
+ }
+
+ private CompositeNode getAddFlowPayload(int flowId, int tableId){
+ final String xml =
+ "<flow xmlns=\"urn:opendaylight:flow:inventory\">"
+ + "<priority>5</priority>"
+ + "<flow-name>Foo</flow-name>"
+ + "<match>"
+ + "<ethernet-match>"
+ + "<ethernet-type>"
+ + "<type>2048</type>"
+ + "</ethernet-type>"
+ + "</ethernet-match>"
+ + "<ipv4-destination>10.0.10.2/24</ipv4-destination>"
+ + "</match>"
+ + "<id>" + flowId + "</id>"
+ + "<table_id>" + tableId + "</table_id>"
+ + "<instructions>"
+ + "<instruction>"
+ + "<order>0</order>"
+ + "<apply-actions>"
+ + "<action>"
+ + "<order>0</order>"
+ + "<dec-nw-ttl/>"
+ + "</action>"
+ + "</apply-actions>"
+ + "</instruction>"
+ + "</instructions>"
+ + "</flow>";
+
+ return XmlUtils.xmlToCompositeNode(xml);
+ }
+}
--- /dev/null
+<add-flow xmlns="urn:opendaylight:flow:service">
+ <input>
+ <transaction-uri>BA-7</transaction-uri>
+ <table_id>4</table_id>
+ <priority>5</priority>
+ <node>
+ /(urn:opendaylight:inventory?revision=2013-08-19)nodes/(urn:opendaylight:inventory?revision=2013-08-19)node[{(urn:opendaylight:inventory?revision=2013-08-19)id=openflow:1}]
+ </node>
+ <match>
+ <ipv4-destination>10.0.10.2/24</ipv4-destination>
+ <ethernet-match>
+ <ethernet-type>
+ <type>2048</type>
+ </ethernet-type>
+ </ethernet-match>
+ </match>
+ <instructions>
+ <instruction>
+ <order>0</order>
+ <apply-actions>
+ <action>
+ <order>0</order>
+ <dec-nw-ttl/>
+ </action>
+ </apply-actions>
+ </instruction>
+ </instructions>
+ <flow-table>
+ /(urn:opendaylight:inventory?revision=2013-08-19)nodes/(urn:opendaylight:inventory?revision=2013-08-19)node[{(urn:opendaylight:inventory?revision=2013-08-19)id=openflow:1}]/(urn:opendaylight:flow:inventory?revision=2013-08-19)table[{(urn:opendaylight:flow:inventory?revision=2013-08-19)id=4}]
+ </flow-table>
+ <flow-ref>
+ /(urn:opendaylight:inventory?revision=2013-08-19)nodes/(urn:opendaylight:inventory?revision=2013-08-19)node[{(urn:opendaylight:inventory?revision=2013-08-19)id=openflow:1}]/(urn:opendaylight:flow:inventory?revision=2013-08-19)table[{(urn:opendaylight:flow:inventory?revision=2013-08-19)id=4}]/(urn:opendaylight:flow:inventory?revision=2013-08-19)flow[{(urn:opendaylight:flow:inventory?revision=2013-08-19)id=4}]
+ </flow-ref>
+ <flow-name>Foo</flow-name>
+ </input>
+</add-flow>
\ No newline at end of file