X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-zeromq-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fconnector%2Fzeromq%2FZeroMqRpcRouter.java;fp=opendaylight%2Fmd-sal%2Fsal-zeromq-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fconnector%2Fzeromq%2FZeroMqRpcRouter.java;h=7e5efda3625cbc64d78737c563de5375a188c7d5;hb=12762ff78528a8e6aa017f78121f13c21a7aea7f;hp=0000000000000000000000000000000000000000;hpb=9139d6ad1d16ba323d5fef0f71fdfde83b679125;p=controller.git diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/ZeroMqRpcRouter.java b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/ZeroMqRpcRouter.java new file mode 100644 index 0000000000..7e5efda362 --- /dev/null +++ b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/zeromq/ZeroMqRpcRouter.java @@ -0,0 +1,337 @@ +package org.opendaylight.controller.sal.connector.zeromq; + +import org.opendaylight.controller.sal.connector.api.RpcRouter; +import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; +import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration; +import org.opendaylight.controller.sal.core.api.RpcImplementation; +import org.opendaylight.controller.sal.core.api.RpcRegistrationListener; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; +import org.zeromq.ZMQ; + +import java.io.IOException; +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.*; +import java.util.concurrent.*; + +public class ZeroMqRpcRouter implements RpcRouter { + + private ExecutorService serverPool; + private static ExecutorService handlersPool; + + private Map, String> routingTable; + + private ProviderSession brokerSession; + + private ZMQ.Context context; + private ZMQ.Socket publisher; + private ZMQ.Socket subscriber; + private ZMQ.Socket replySocket; + + private static ZeroMqRpcRouter _instance = new ZeroMqRpcRouter(); + + private final RpcFacade facade = new RpcFacade(); + private final RpcListener listener = new RpcListener(); + + private String pubPort = System.getProperty("pub.port");//port on which announcements are sent + private String subPort = System.getProperty("sub.port");//other controller's pub port + private String pubIp = System.getProperty("pub.ip"); //other controller's ip + private String rpcPort = System.getProperty("rpc.port");//port on which RPC messages are received + + + private ZeroMqRpcRouter() { + } + + public static ZeroMqRpcRouter getInstance() { + return _instance; + } + + public void start() { + context = ZMQ.context(2); + serverPool = Executors.newSingleThreadExecutor(); + handlersPool = Executors.newCachedThreadPool(); + routingTable = new ConcurrentHashMap, String>(); + + // Start listening for announce and rpc messages + serverPool.execute(receive()); + + + brokerSession.addRpcRegistrationListener(listener); + Set currentlySupported = brokerSession.getSupportedRpcs(); + for(QName rpc : currentlySupported) { + listener.onRpcImplementationAdded(rpc); + } + + + } + + public void stop() { + if (handlersPool != null) handlersPool.shutdown(); + if (serverPool != null) serverPool.shutdown(); + if (publisher != null) { + publisher.setLinger(0); + publisher.close(); + } + if (replySocket != null) { + replySocket.setLinger(0); + replySocket.close(); + } + if (subscriber != null) { + subscriber.setLinger(0); + subscriber.close(); + } + if (context != null) context.term(); + + + } + + private Runnable receive() { + return new Runnable() { + public void run() { + try { + // Bind to RPC reply socket + replySocket = context.socket(ZMQ.REP); + replySocket.bind("tcp://*:" + rpcPort); + + // Bind to publishing controller + subscriber = context.socket(ZMQ.SUB); + subscriber.connect("tcp://" + pubIp + ":" + subPort); + System.out.println("Subscribing at[" + "tcp://" + pubIp + ":" + subPort + "]"); + + subscriber.subscribe(Message.serialize(Message.MessageType.ANNOUNCE)); + + // Initialize poll set + ZMQ.Poller poller = new ZMQ.Poller(2); + poller.register(replySocket, ZMQ.Poller.POLLIN); + poller.register(subscriber, ZMQ.Poller.POLLIN); + + while (!Thread.currentThread().isInterrupted()) { + + poller.poll(250); + //TODO: Fix this + if (poller.pollin(0)) { + //receive rpc request and reply + try { + Message req = parseMessage(replySocket); + Message resp = new Message(); + //Call broker to process the message then reply + Future> rpc = brokerSession.rpc((QName) req.getRoute().getType(), (CompositeNode) req.getPayload()); + RpcResult result = rpc.get(); + resp.setType(Message.MessageType.RESPONSE); + resp.setSender(getLocalIpAddress() + ":" + rpcPort); + resp.setRoute(req.getRoute()); + resp.setPayload(result.isSuccessful()); + replySocket.send(Message.serialize(resp)); + + } catch (IOException ex) {// | ClassNotFoundException ex) { + System.out.println("Rpc request could not be handled" + ex); + } + } + if (poller.pollin(1)) { + //get subscription and update routing table + //try { + Message.MessageType topic = (Message.MessageType)Message.deserialize(subscriber.recv()); + System.out.println("Topic:[" + topic + "]"); + + if (subscriber.hasReceiveMore()) { + try { + Message m = (Message) Message.deserialize(subscriber.recv()); + System.out.println(m); + //TODO: check on msg type or topic. Both should be same. Need to normalize. + if (Message.MessageType.ANNOUNCE == m.getType()) updateRoutingTable(m); + } catch (IOException | ClassNotFoundException e) { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } +// + } + } + } catch (Exception e) { + e.printStackTrace(); + } + replySocket.setLinger(0); + replySocket.close(); + subscriber.setLinger(0); + subscriber.close(); + } + }; + } + + private void updateRoutingTable(Message msg) { + routingTable.put(msg.getRoute(), msg.getSender()); + RpcRouter.RouteIdentifier route = msg.getRoute(); + QName rpcType = route.getType(); + System.out.println("Routing Table\n" + routingTable); + + RpcRegistration registration = brokerSession.addRpcImplementation(rpcType, facade); + } + + private Message parseMessage(ZMQ.Socket socket) { + //Message m = new Message(); + //socket.setReceiveBufferSize(40000); + Message msg = null; + try { + byte[] bytes = socket.recv(); + System.out.println("Received bytes:[" + bytes.length + "]"); + msg = (Message) Message.deserialize(bytes); + } catch (Throwable t) { + System.out.println("Caught exception"); + t.printStackTrace(); + } + return msg; + /*m.setType((Message.MessageType) Message.deserialize(socket.recv())); + + if (socket.hasReceiveMore()) { + m.setSender((String) Message.deserialize(socket.recv())); + } + if (socket.hasReceiveMore()) { + m.setRoute((RouteIdentifier) Message.deserialize(socket.recv())); + } + if (socket.hasReceiveMore()) { + m.setPayload(Message.deserialize(socket.recv())); + } + return m;*/ + } + + @Override + public Future> sendRpc(final RpcRequest input) { + + return handlersPool.submit(new Callable>() { + + @Override + public RpcReply call() { + ZMQ.Socket requestSocket = context.socket(ZMQ.REQ); + Message req = new Message(); + Message resp = null; + RpcReplyImpl reply = new RpcReplyImpl(); + requestSocket.connect((String) routingTable.get(input.getRoutingInformation().getRoute())); + + req.setType(Message.MessageType.REQUEST); + req.setSender(getLocalIpAddress() + ":" + rpcPort); + req.setRoute(input.getRoutingInformation()); + req.setPayload(input.getPayload()); + try { + requestSocket.send(Message.serialize(req)); + resp = parseMessage(requestSocket); + reply.setPayload(resp.getPayload()); + } catch (IOException ex) {//| ClassNotFoundException ex) { + //Log and ignore + System.out.println("Error in RPC send. Input could not be serialized[" + input + "]"); + } + + return reply; + } + }); + } + + public void publish(final Message message) { + Runnable task = new Runnable() { + public void run() { + // Bind to publishing port + publisher = context.socket(ZMQ.PUB); + publisher.bind("tcp://*:" + pubPort); + System.out.println("Publisher started at port[" + pubPort + "]"); + try { + Message outMessage = new Message(); + outMessage.setType(Message.MessageType.ANNOUNCE); + outMessage.setSender("tcp://" + getLocalIpAddress() + ":" + rpcPort); + outMessage.setRoute(message.getRoute()); + + System.out.println("Sending announcement[" + outMessage + "]"); + publisher.sendMore(Message.serialize(Message.MessageType.ANNOUNCE)); + publisher.send(Message.serialize(outMessage)); + + } catch (IOException ex) { + //Log and ignore + System.out.println("Error in publishing"); + ex.printStackTrace(); + } + System.out.println("Published message[" + message + "]"); + publisher.close(); + } + }; + handlersPool.execute(task); + } + + private String getLocalIpAddress() { + String hostAddress = null; + Enumeration e = null; + try { + e = NetworkInterface.getNetworkInterfaces(); + } catch (SocketException e1) { + e1.printStackTrace(); + } + while (e.hasMoreElements()) { + + NetworkInterface n = (NetworkInterface) e.nextElement(); + Enumeration ee = n.getInetAddresses(); + while (ee.hasMoreElements()) { + InetAddress i = (InetAddress) ee.nextElement(); + if ((i instanceof Inet4Address) && (i.isSiteLocalAddress())) + hostAddress = i.getHostAddress(); + } + } + + return hostAddress; + + + } + + + private class RpcFacade implements RpcImplementation { + + + @Override + public Set getSupportedRpcs() { + return Collections.emptySet(); + } + + @Override + public RpcResult invokeRpc(QName rpc, CompositeNode input) { + + RpcRequestImpl request = new RpcRequestImpl(); + RouteIdentifierImpl routeId = new RouteIdentifierImpl(); + routeId.setContext(null); + routeId.setRoute(null); + routeId.setType(rpc); + + request.setRouteIdentifier(routeId); + request.setPayload(input); + // Create message + + Future> ret = sendRpc(request); + + return null; + } + } + + private class RpcListener implements RpcRegistrationListener { + + @Override + public void onRpcImplementationAdded(QName name) { + + Message msg = new Message(); + RouteIdentifierImpl routeId = new RouteIdentifierImpl(); + routeId.setType(name); + msg.setRoute(routeId); + publish(msg); + } + + @Override + public void onRpcImplementationRemoved(QName name) { + // TODO Auto-generated method stub + + } + } + + public void setBrokerSession(ProviderSession session) { + this.brokerSession = session; + + } + +}