1 package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq;
3 import java.io.IOException;
4 import java.net.Inet4Address;
5 import java.net.InetAddress;
6 import java.net.NetworkInterface;
7 import java.net.SocketException;
8 import java.util.Collection;
9 import java.util.Collections;
10 import java.util.Enumeration;
13 import java.util.concurrent.Callable;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.Future;
20 import org.opendaylight.controller.sal.connector.api.RpcRouter;
21 import org.opendaylight.controller.sal.connector.remoterpc.router.zeromq.Message.MessageType;
22 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
23 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
24 import org.opendaylight.controller.sal.core.api.RpcImplementation;
25 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
26 import org.opendaylight.yangtools.yang.common.QName;
27 import org.opendaylight.yangtools.yang.common.RpcError;
28 import org.opendaylight.yangtools.yang.common.RpcResult;
29 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
30 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
31 import org.zeromq.ZMQ;
34 * ZeroMq based implementation of RpcRouter
36 * 1. Make it multi VM aware
37 * 2. Make rpc request handling async and non-blocking. Note zmq socket is not thread safe
38 * 3. sendRpc() should use connection pooling
39 * 4. Read properties from config file using existing(?) ODL properties framework
41 public class ZeroMqRpcRouter implements RpcRouter<QName, QName, InstanceIdentifier, Object> {
43 private ExecutorService serverPool;
44 private static ExecutorService handlersPool;
46 private Map<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String> routingTable;
48 private ProviderSession brokerSession;
50 private ZMQ.Context context;
51 private ZMQ.Socket publisher;
52 private ZMQ.Socket subscriber;
53 private ZMQ.Socket replySocket;
55 private static ZeroMqRpcRouter _instance = new ZeroMqRpcRouter();
57 private final RpcFacade facade = new RpcFacade();
58 private final RpcListener listener = new RpcListener();
60 private final String localIp = getLocalIpAddress();
62 private String pubPort = System.getProperty("pub.port");// port on which announcements are sent
63 private String subPort = System.getProperty("sub.port");// other controller's pub port
64 private String pubIp = System.getProperty("pub.ip"); // other controller's ip
65 private String rpcPort = System.getProperty("rpc.port");// port on which RPC messages are received
67 //Prevent instantiation
68 private ZeroMqRpcRouter() {
71 public static ZeroMqRpcRouter getInstance() {
76 context = ZMQ.context(2);
77 publisher = context.socket(ZMQ.PUB);
78 int ret = publisher.bind("tcp://*:" + pubPort);
79 System.out.println(Thread.currentThread().getName() + " Return(publish bind) :[" + ret + "]");
80 // serverPool = Executors.newSingleThreadExecutor();
81 serverPool = Executors.newCachedThreadPool();
82 handlersPool = Executors.newCachedThreadPool();
83 routingTable = new ConcurrentHashMap<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String>();
85 // Start listening for announce and rpc messages
86 serverPool.execute(receive());
88 brokerSession.addRpcRegistrationListener(listener);
90 Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
91 for (QName rpc : currentlySupported) {
92 listener.onRpcImplementationAdded(rpc);
98 if (handlersPool != null)
99 handlersPool.shutdown();
100 if (serverPool != null)
101 serverPool.shutdown();
102 if (publisher != null) {
103 publisher.setLinger(0);
106 if (replySocket != null) {
107 replySocket.setLinger(0);
110 if (subscriber != null) {
111 subscriber.setLinger(0);
119 private Runnable receive() {
120 return new Runnable() {
123 // Bind to RPC reply socket
124 replySocket = context.socket(ZMQ.REP);
125 replySocket.bind("tcp://*:" + rpcPort);
127 // Bind to publishing controller
128 subscriber = context.socket(ZMQ.SUB);
129 subscriber.connect("tcp://" + pubIp + ":" + subPort);
130 System.out.println(Thread.currentThread().getName() + "-Subscribing at[" + "tcp://"
131 + pubIp + ":" + subPort + "]");
133 //subscribe for announcements
134 //TODO: Message type would be changed. Update this
135 subscriber.subscribe(Message.serialize(Message.MessageType.ANNOUNCE));
137 // Poller enables listening on multiple sockets using a single thread
138 ZMQ.Poller poller = new ZMQ.Poller(2);
139 poller.register(replySocket, ZMQ.Poller.POLLIN);
140 poller.register(subscriber, ZMQ.Poller.POLLIN);
141 System.out.println(Thread.currentThread().getName() + "-Start Polling");
143 //TODO: Add code to restart the thread after exception
144 while (!Thread.currentThread().isInterrupted()) {
148 if (poller.pollin(0)) {
151 if (poller.pollin(1)) {
152 handleAnnouncement();
155 } catch (Exception e) {
158 replySocket.setLinger(0);
160 subscriber.setLinger(0);
167 * @throws IOException
168 * @throws ClassNotFoundException
170 private void handleAnnouncement() throws IOException, ClassNotFoundException {
171 System.out.println("\n" + Thread.currentThread().getName() + "-Received message");
172 Message.MessageType topic = (MessageType) Message.deserialize(subscriber.recv());
173 System.out.println("Topic:[" + topic + "]");
175 if (subscriber.hasReceiveMore()) {
177 Message m = (Message) Message.deserialize(subscriber.recv());
178 System.out.println(m);
179 // TODO: check on msg type or topic. Both
180 // should be same. Need to normalize.
181 if (Message.MessageType.ANNOUNCE == m.getType())
182 updateRoutingTable(m);
183 } catch (IOException | ClassNotFoundException e) {
191 * @throws InterruptedException
192 * @throws ExecutionException
194 private void handleRpcCall() throws InterruptedException, ExecutionException {
196 Message req = parseMessage(replySocket);
198 System.out.println("Received RPC request [" + req + "]");
200 // Call broker to process the message then reply
201 Future<RpcResult<CompositeNode>> rpc = brokerSession.rpc(
202 (QName) req.getRoute().getType(), (CompositeNode) req.getPayload());
204 RpcResult<CompositeNode> result = rpc.get();
206 Message response = new Message.MessageBuilder()
207 .type(MessageType.RESPONSE)
208 .sender(localIp + ":" + rpcPort)
209 .route(req.getRoute())
210 //.payload(result) TODO: enable and test
213 replySocket.send(Message.serialize(response));
215 System.out.println("Sent RPC response [" + response + "]");
217 } catch (IOException ex) {
218 //TODO: handle exception and send error codes to caller
219 System.out.println("Rpc request could not be handled" + ex);
225 public Future<RpcReply<Object>> sendRpc(
226 final RpcRequest<QName, QName, InstanceIdentifier, Object> input) {
228 return handlersPool.submit(new Callable<RpcReply<Object>>() {
231 public RpcReply<Object> call() {
232 ZMQ.Socket requestSocket = context.socket(ZMQ.REQ);
234 // TODO pick the ip and port from routing table based on routing identifier
235 requestSocket.connect("tcp://" + pubIp + ":5554");
237 Message requestMessage = new Message.MessageBuilder()
238 .type(MessageType.REQUEST)
239 .sender(localIp + ":" + rpcPort)
240 .route(input.getRoutingInformation())
241 .payload(input.getPayload())
244 RpcReply<Object> reply = null;
247 System.out.println("\n\nRPC Request [" + requestMessage + "]");
249 requestSocket.send(Message.serialize(requestMessage));
250 final Message resp = parseMessage(requestSocket);
252 System.out.println("\n\nRPC Response [" + resp + "]");
254 reply = new RpcReply<Object>() {
257 public Object getPayload() {
258 return resp.getPayload();
261 } catch (IOException ex) {
262 // TODO: Pass exception back to the caller
263 System.out.println("Error in RPC send. Input could not be serialized[" + input + "]");
272 * TODO: Remove this implementation and use RoutingTable implementation to send announcements
273 * Publishes a notice to other controllers in the cluster
277 public void publish(final Message notice) {
278 Runnable task = new Runnable() {
282 Thread.currentThread().getName() + "-Publisher started at port[" + pubPort + "]");
287 Thread.currentThread().getName() + "-Sending announcement[" + notice + "]");
289 publisher.sendMore(Message.serialize(Message.MessageType.ANNOUNCE));
290 publisher.send(Message.serialize(notice));
292 } catch (IOException ex) {
293 System.out.println("Error in publishing");
294 ex.printStackTrace();
296 System.out.println(Thread.currentThread().getName() + "-Published message[" + notice
301 handlersPool.execute(task);
305 * Finds IPv4 address of the local VM
306 * TODO: This method is non-deterministic. There may be more than one IPv4 address. Cant say which
307 * address will be returned. Read IP from a property file or enhance the code to make it deterministic.
308 * Should we use IP or hostname?
312 private String getLocalIpAddress() {
313 String hostAddress = null;
314 Enumeration e = null;
316 e = NetworkInterface.getNetworkInterfaces();
317 } catch (SocketException e1) {
318 e1.printStackTrace();
320 while (e.hasMoreElements()) {
322 NetworkInterface n = (NetworkInterface) e.nextElement();
324 Enumeration ee = n.getInetAddresses();
325 while (ee.hasMoreElements()) {
326 InetAddress i = (InetAddress) ee.nextElement();
327 if ((i instanceof Inet4Address) && (i.isSiteLocalAddress()))
328 hostAddress = i.getHostAddress();
336 * TODO: Change to use external routing table implementation
340 private void updateRoutingTable(Message msg) {
341 routingTable.put(msg.getRoute(), msg.getSender());
342 RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> route = msg.getRoute();
344 // Currently only registers rpc implementation.
345 // TODO: do registration for instance based routing
346 QName rpcType = route.getType();
347 RpcRegistration registration = brokerSession.addRpcImplementation(rpcType, facade);
354 private Message parseMessage(ZMQ.Socket socket) {
358 byte[] bytes = socket.recv();
359 System.out.println("Received bytes:[" + bytes.length + "]");
360 msg = (Message) Message.deserialize(bytes);
361 } catch (Throwable t) {
362 System.out.println("Caught exception");
368 private class RpcFacade implements RpcImplementation {
371 public Set<QName> getSupportedRpcs() {
372 return Collections.emptySet();
376 public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
378 RouteIdentifierImpl routeId = new RouteIdentifierImpl();
379 routeId.setType(rpc);
381 RpcRequestImpl request = new RpcRequestImpl();
382 request.setRouteIdentifier(routeId);
383 request.setPayload(input);
385 final Future<RpcReply<Object>> ret = sendRpc(request);
387 //TODO: Review result handling
388 RpcResult<CompositeNode> result = new RpcResult<CompositeNode>() {
390 public boolean isSuccessful() {
393 } catch (InterruptedException | ExecutionException e) {
401 public CompositeNode getResult() {
406 public Collection<RpcError> getErrors() {
407 return Collections.EMPTY_LIST;
415 * Listener for rpc registrations
417 private class RpcListener implements RpcRegistrationListener {
420 public void onRpcImplementationAdded(QName name) {
421 System.out.println("In ZeroMQ Rpc Listener onRpcImplementationAdded()");
423 RouteIdentifierImpl routeId = new RouteIdentifierImpl();
424 routeId.setType(name);
426 //TODO: Make notice immutable and change message type
427 Message notice = new Message.MessageBuilder()
428 .type(MessageType.ANNOUNCE)
429 .sender("tcp://" + localIp + ":" + rpcPort)
437 public void onRpcImplementationRemoved(QName name) {
438 // TODO: send a rpc-deregistrtation notice
443 public void setBrokerSession(ProviderSession session) {
444 this.brokerSession = session;