1 package org.opendaylight.controller.sal.connector.zeromq;
3 import org.opendaylight.controller.sal.connector.api.RpcRouter;
4 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
5 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
6 import org.opendaylight.controller.sal.core.api.RpcImplementation;
7 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
8 import org.opendaylight.yangtools.yang.common.QName;
9 import org.opendaylight.yangtools.yang.common.RpcResult;
10 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
11 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
12 import org.zeromq.ZMQ;
14 import java.io.IOException;
15 import java.net.Inet4Address;
16 import java.net.InetAddress;
17 import java.net.NetworkInterface;
18 import java.net.SocketException;
20 import java.util.concurrent.*;
22 public class ZeroMqRpcRouter implements RpcRouter<QName, QName, InstanceIdentifier, Object> {
24 private ExecutorService serverPool;
25 private static ExecutorService handlersPool;
27 private Map<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String> routingTable;
29 private ProviderSession brokerSession;
31 private ZMQ.Context context;
32 private ZMQ.Socket publisher;
33 private ZMQ.Socket subscriber;
34 private ZMQ.Socket replySocket;
36 private static ZeroMqRpcRouter _instance = new ZeroMqRpcRouter();
38 private final RpcFacade facade = new RpcFacade();
39 private final RpcListener listener = new RpcListener();
41 private String pubPort = System.getProperty("pub.port");//port on which announcements are sent
42 private String subPort = System.getProperty("sub.port");//other controller's pub port
43 private String pubIp = System.getProperty("pub.ip"); //other controller's ip
44 private String rpcPort = System.getProperty("rpc.port");//port on which RPC messages are received
47 private ZeroMqRpcRouter() {
50 public static ZeroMqRpcRouter getInstance() {
55 context = ZMQ.context(2);
56 serverPool = Executors.newSingleThreadExecutor();
57 handlersPool = Executors.newCachedThreadPool();
58 routingTable = new ConcurrentHashMap<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String>();
60 // Start listening for announce and rpc messages
61 serverPool.execute(receive());
64 brokerSession.addRpcRegistrationListener(listener);
65 Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
66 for(QName rpc : currentlySupported) {
67 listener.onRpcImplementationAdded(rpc);
74 if (handlersPool != null) handlersPool.shutdown();
75 if (serverPool != null) serverPool.shutdown();
76 if (publisher != null) {
77 publisher.setLinger(0);
80 if (replySocket != null) {
81 replySocket.setLinger(0);
84 if (subscriber != null) {
85 subscriber.setLinger(0);
88 if (context != null) context.term();
93 private Runnable receive() {
94 return new Runnable() {
97 // Bind to RPC reply socket
98 replySocket = context.socket(ZMQ.REP);
99 replySocket.bind("tcp://*:" + rpcPort);
101 // Bind to publishing controller
102 subscriber = context.socket(ZMQ.SUB);
103 subscriber.connect("tcp://" + pubIp + ":" + subPort);
104 System.out.println("Subscribing at[" + "tcp://" + pubIp + ":" + subPort + "]");
106 subscriber.subscribe(Message.serialize(Message.MessageType.ANNOUNCE));
108 // Initialize poll set
109 ZMQ.Poller poller = new ZMQ.Poller(2);
110 poller.register(replySocket, ZMQ.Poller.POLLIN);
111 poller.register(subscriber, ZMQ.Poller.POLLIN);
113 while (!Thread.currentThread().isInterrupted()) {
117 if (poller.pollin(0)) {
118 //receive rpc request and reply
120 Message req = parseMessage(replySocket);
121 Message resp = new Message();
122 //Call broker to process the message then reply
123 Future<RpcResult<CompositeNode>> rpc = brokerSession.rpc((QName) req.getRoute().getType(), (CompositeNode) req.getPayload());
124 RpcResult<CompositeNode> result = rpc.get();
125 resp.setType(Message.MessageType.RESPONSE);
126 resp.setSender(getLocalIpAddress() + ":" + rpcPort);
127 resp.setRoute(req.getRoute());
128 resp.setPayload(result.isSuccessful());
129 replySocket.send(Message.serialize(resp));
131 } catch (IOException ex) {// | ClassNotFoundException ex) {
132 System.out.println("Rpc request could not be handled" + ex);
135 if (poller.pollin(1)) {
136 //get subscription and update routing table
138 Message.MessageType topic = (Message.MessageType)Message.deserialize(subscriber.recv());
139 System.out.println("Topic:[" + topic + "]");
141 if (subscriber.hasReceiveMore()) {
143 Message m = (Message) Message.deserialize(subscriber.recv());
144 System.out.println(m);
145 //TODO: check on msg type or topic. Both should be same. Need to normalize.
146 if (Message.MessageType.ANNOUNCE == m.getType()) updateRoutingTable(m);
147 } catch (IOException | ClassNotFoundException e) {
148 e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
154 } catch (Exception e) {
157 replySocket.setLinger(0);
159 subscriber.setLinger(0);
165 private void updateRoutingTable(Message msg) {
166 routingTable.put(msg.getRoute(), msg.getSender());
167 RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> route = msg.getRoute();
168 QName rpcType = route.getType();
169 System.out.println("Routing Table\n" + routingTable);
171 RpcRegistration registration = brokerSession.addRpcImplementation(rpcType, facade);
174 private Message parseMessage(ZMQ.Socket socket) {
175 //Message m = new Message();
176 //socket.setReceiveBufferSize(40000);
179 byte[] bytes = socket.recv();
180 System.out.println("Received bytes:[" + bytes.length + "]");
181 msg = (Message) Message.deserialize(bytes);
182 } catch (Throwable t) {
183 System.out.println("Caught exception");
187 /*m.setType((Message.MessageType) Message.deserialize(socket.recv()));
189 if (socket.hasReceiveMore()) {
190 m.setSender((String) Message.deserialize(socket.recv()));
192 if (socket.hasReceiveMore()) {
193 m.setRoute((RouteIdentifier) Message.deserialize(socket.recv()));
195 if (socket.hasReceiveMore()) {
196 m.setPayload(Message.deserialize(socket.recv()));
202 public Future<RpcReply<Object>> sendRpc(final RpcRequest<QName, QName, InstanceIdentifier, Object> input) {
204 return handlersPool.submit(new Callable<RpcReply<Object>>() {
207 public RpcReply<Object> call() {
208 ZMQ.Socket requestSocket = context.socket(ZMQ.REQ);
209 Message req = new Message();
211 RpcReplyImpl reply = new RpcReplyImpl();
212 requestSocket.connect((String) routingTable.get(input.getRoutingInformation().getRoute()));
214 req.setType(Message.MessageType.REQUEST);
215 req.setSender(getLocalIpAddress() + ":" + rpcPort);
216 req.setRoute(input.getRoutingInformation());
217 req.setPayload(input.getPayload());
219 requestSocket.send(Message.serialize(req));
220 resp = parseMessage(requestSocket);
221 reply.setPayload(resp.getPayload());
222 } catch (IOException ex) {//| ClassNotFoundException ex) {
224 System.out.println("Error in RPC send. Input could not be serialized[" + input + "]");
232 public void publish(final Message message) {
233 Runnable task = new Runnable() {
235 // Bind to publishing port
236 publisher = context.socket(ZMQ.PUB);
237 publisher.bind("tcp://*:" + pubPort);
238 System.out.println("Publisher started at port[" + pubPort + "]");
240 Message outMessage = new Message();
241 outMessage.setType(Message.MessageType.ANNOUNCE);
242 outMessage.setSender("tcp://" + getLocalIpAddress() + ":" + rpcPort);
243 outMessage.setRoute(message.getRoute());
245 System.out.println("Sending announcement[" + outMessage + "]");
246 publisher.sendMore(Message.serialize(Message.MessageType.ANNOUNCE));
247 publisher.send(Message.serialize(outMessage));
249 } catch (IOException ex) {
251 System.out.println("Error in publishing");
252 ex.printStackTrace();
254 System.out.println("Published message[" + message + "]");
258 handlersPool.execute(task);
261 private String getLocalIpAddress() {
262 String hostAddress = null;
263 Enumeration e = null;
265 e = NetworkInterface.getNetworkInterfaces();
266 } catch (SocketException e1) {
267 e1.printStackTrace();
269 while (e.hasMoreElements()) {
271 NetworkInterface n = (NetworkInterface) e.nextElement();
272 Enumeration ee = n.getInetAddresses();
273 while (ee.hasMoreElements()) {
274 InetAddress i = (InetAddress) ee.nextElement();
275 if ((i instanceof Inet4Address) && (i.isSiteLocalAddress()))
276 hostAddress = i.getHostAddress();
286 private class RpcFacade implements RpcImplementation {
290 public Set<QName> getSupportedRpcs() {
291 return Collections.emptySet();
295 public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
297 RpcRequestImpl request = new RpcRequestImpl();
298 RouteIdentifierImpl routeId = new RouteIdentifierImpl();
299 routeId.setContext(null);
300 routeId.setRoute(null);
301 routeId.setType(rpc);
303 request.setRouteIdentifier(routeId);
304 request.setPayload(input);
307 Future<org.opendaylight.controller.sal.connector.api.RpcRouter.RpcReply<Object>> ret = sendRpc(request);
313 private class RpcListener implements RpcRegistrationListener {
316 public void onRpcImplementationAdded(QName name) {
318 Message msg = new Message();
319 RouteIdentifierImpl routeId = new RouteIdentifierImpl();
320 routeId.setType(name);
321 msg.setRoute(routeId);
326 public void onRpcImplementationRemoved(QName name) {
327 // TODO Auto-generated method stub
332 public void setBrokerSession(ProviderSession session) {
333 this.brokerSession = session;