Updating ZeroMQ connector implementation. Its a work inprogress. The current implemen...
[controller.git] / opendaylight / md-sal / sal-zeromq-connector / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / router / zeromq / ZeroMqRpcRouter.java
1 package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq;
2
3 import java.io.IOException;
4 import java.net.Inet4Address;
5 import java.net.InetAddress;
6 import java.net.NetworkInterface;
7 import java.net.SocketException;
8 import java.util.Collection;
9 import java.util.Collections;
10 import java.util.Enumeration;
11 import java.util.Map;
12 import java.util.Set;
13 import java.util.concurrent.Callable;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.Future;
19
20 import org.opendaylight.controller.sal.connector.api.RpcRouter;
21 import org.opendaylight.controller.sal.connector.remoterpc.router.zeromq.Message.MessageType;
22 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
23 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
24 import org.opendaylight.controller.sal.core.api.RpcImplementation;
25 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
26 import org.opendaylight.yangtools.yang.common.QName;
27 import org.opendaylight.yangtools.yang.common.RpcError;
28 import org.opendaylight.yangtools.yang.common.RpcResult;
29 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
30 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
31 import org.zeromq.ZMQ;
32
33 /**
34  * ZeroMq based implementation of RpcRouter
35  * TODO:
36  *    1. Make it multi VM aware
37  *    2. Make rpc request handling async and non-blocking. Note zmq socket is not thread safe
38  *    3. sendRpc() should use connection pooling
39  *    4. Read properties from config file using existing(?) ODL properties framework
40  */
41 public class ZeroMqRpcRouter implements RpcRouter<QName, QName, InstanceIdentifier, Object> {
42
43   private ExecutorService serverPool;
44   private static ExecutorService handlersPool;
45
46   private Map<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String> routingTable;
47
48   private ProviderSession brokerSession;
49
50   private ZMQ.Context context;
51   private ZMQ.Socket publisher;
52   private ZMQ.Socket subscriber;
53   private ZMQ.Socket replySocket;
54
55   private static ZeroMqRpcRouter _instance = new ZeroMqRpcRouter();
56
57   private final RpcFacade facade = new RpcFacade();
58   private final RpcListener listener = new RpcListener();
59
60   private final String localIp = getLocalIpAddress();
61
62   private String pubPort = System.getProperty("pub.port");// port on which announcements are sent
63   private String subPort = System.getProperty("sub.port");// other controller's pub port
64   private String pubIp = System.getProperty("pub.ip"); // other controller's ip
65   private String rpcPort = System.getProperty("rpc.port");// port on which RPC messages are received
66
67   //Prevent instantiation
68   private ZeroMqRpcRouter() {
69   }
70
71   public static ZeroMqRpcRouter getInstance() {
72     return _instance;
73   }
74
75   public void start() {
76     context = ZMQ.context(2);
77     publisher = context.socket(ZMQ.PUB);
78     int ret = publisher.bind("tcp://*:" + pubPort);
79     System.out.println(Thread.currentThread().getName() + " Return(publish bind) :[" + ret + "]");
80     // serverPool = Executors.newSingleThreadExecutor();
81     serverPool = Executors.newCachedThreadPool();
82     handlersPool = Executors.newCachedThreadPool();
83     routingTable = new ConcurrentHashMap<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String>();
84
85     // Start listening for announce and rpc messages
86     serverPool.execute(receive());
87
88     brokerSession.addRpcRegistrationListener(listener);
89
90     Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
91     for (QName rpc : currentlySupported) {
92       listener.onRpcImplementationAdded(rpc);
93     }
94
95   }
96
97   public void stop() {
98     if (handlersPool != null)
99       handlersPool.shutdown();
100     if (serverPool != null)
101       serverPool.shutdown();
102     if (publisher != null) {
103       publisher.setLinger(0);
104       publisher.close();
105     }
106     if (replySocket != null) {
107       replySocket.setLinger(0);
108       replySocket.close();
109     }
110     if (subscriber != null) {
111       subscriber.setLinger(0);
112       subscriber.close();
113     }
114     if (context != null)
115       context.term();
116
117   }
118
119   private Runnable receive() {
120     return new Runnable() {
121       public void run() {
122         try {
123           // Bind to RPC reply socket
124           replySocket = context.socket(ZMQ.REP);
125           replySocket.bind("tcp://*:" + rpcPort);
126
127           // Bind to publishing controller
128           subscriber = context.socket(ZMQ.SUB);
129           subscriber.connect("tcp://" + pubIp + ":" + subPort);
130           System.out.println(Thread.currentThread().getName() + "-Subscribing at[" + "tcp://"
131               + pubIp + ":" + subPort + "]");
132
133           //subscribe for announcements
134           //TODO: Message type would be changed. Update this
135           subscriber.subscribe(Message.serialize(Message.MessageType.ANNOUNCE));
136
137           // Poller enables listening on multiple sockets using a single thread
138           ZMQ.Poller poller = new ZMQ.Poller(2);
139           poller.register(replySocket, ZMQ.Poller.POLLIN);
140           poller.register(subscriber, ZMQ.Poller.POLLIN);
141           System.out.println(Thread.currentThread().getName() + "-Start Polling");
142
143           //TODO: Add code to restart the thread after exception
144           while (!Thread.currentThread().isInterrupted()) {
145
146             poller.poll();
147
148             if (poller.pollin(0)) {
149               handleRpcCall();
150             }
151             if (poller.pollin(1)) {
152               handleAnnouncement();
153             }
154           }
155         } catch (Exception e) {
156           e.printStackTrace();
157         }
158         replySocket.setLinger(0);
159         replySocket.close();
160         subscriber.setLinger(0);
161         subscriber.close();
162       }
163     };
164   }
165
166   /**
167    * @throws IOException
168    * @throws ClassNotFoundException
169    */
170   private void handleAnnouncement() throws IOException, ClassNotFoundException {
171     System.out.println("\n" + Thread.currentThread().getName() + "-Received message");
172     Message.MessageType topic = (MessageType) Message.deserialize(subscriber.recv());
173     System.out.println("Topic:[" + topic + "]");
174
175     if (subscriber.hasReceiveMore()) {
176       try {
177         Message m = (Message) Message.deserialize(subscriber.recv());
178         System.out.println(m);
179         // TODO: check on msg type or topic. Both
180         // should be same. Need to normalize.
181         if (Message.MessageType.ANNOUNCE == m.getType())
182           updateRoutingTable(m);
183       } catch (IOException | ClassNotFoundException e) {
184         e.printStackTrace();
185       }
186     }
187
188   }
189
190   /**
191    * @throws InterruptedException
192    * @throws ExecutionException
193    */
194   private void handleRpcCall() throws InterruptedException, ExecutionException {
195     try {
196       Message req = parseMessage(replySocket);
197
198       System.out.println("Received RPC request [" + req + "]");
199
200       // Call broker to process the message then reply
201       Future<RpcResult<CompositeNode>> rpc = brokerSession.rpc(
202           (QName) req.getRoute().getType(), (CompositeNode) req.getPayload());
203
204       RpcResult<CompositeNode> result = rpc.get();
205
206       Message response = new Message.MessageBuilder()
207           .type(MessageType.RESPONSE)
208           .sender(localIp + ":" + rpcPort)
209           .route(req.getRoute())
210           //.payload(result)    TODO: enable and test
211           .build();
212
213       replySocket.send(Message.serialize(response));
214
215       System.out.println("Sent RPC response [" + response + "]");
216
217     } catch (IOException ex) {
218       //TODO: handle exception and send error codes to caller
219       System.out.println("Rpc request could not be handled" + ex);
220     }
221   }
222
223
224   @Override
225   public Future<RpcReply<Object>> sendRpc(
226       final RpcRequest<QName, QName, InstanceIdentifier, Object> input) {
227
228     return handlersPool.submit(new Callable<RpcReply<Object>>() {
229
230       @Override
231       public RpcReply<Object> call() {
232         ZMQ.Socket requestSocket = context.socket(ZMQ.REQ);
233
234         // TODO pick the ip and port from routing table based on routing identifier
235         requestSocket.connect("tcp://" + pubIp + ":5554");
236
237         Message requestMessage = new Message.MessageBuilder()
238             .type(MessageType.REQUEST)
239             .sender(localIp + ":" + rpcPort)
240             .route(input.getRoutingInformation())
241             .payload(input.getPayload())
242             .build();
243
244         RpcReply<Object> reply = null;
245
246         try {
247           System.out.println("\n\nRPC Request [" + requestMessage + "]");
248
249           requestSocket.send(Message.serialize(requestMessage));
250           final Message resp = parseMessage(requestSocket);
251
252           System.out.println("\n\nRPC Response [" + resp + "]");
253
254           reply = new RpcReply<Object>() {
255
256             @Override
257             public Object getPayload() {
258               return resp.getPayload();
259             }
260           };
261         } catch (IOException ex) {
262           // TODO: Pass exception back to the caller
263           System.out.println("Error in RPC send. Input could not be serialized[" + input + "]");
264         }
265
266         return reply;
267       }
268     });
269   }
270
271   /**
272    * TODO: Remove this implementation and use RoutingTable implementation to send announcements
273    * Publishes a notice to other controllers in the cluster
274    *
275    * @param notice
276    */
277   public void publish(final Message notice) {
278     Runnable task = new Runnable() {
279       public void run() {
280
281         System.out.println(
282             Thread.currentThread().getName() + "-Publisher started at port[" + pubPort + "]");
283
284         try {
285
286           System.out.println(
287               Thread.currentThread().getName() + "-Sending announcement[" + notice + "]");
288
289           publisher.sendMore(Message.serialize(Message.MessageType.ANNOUNCE));
290           publisher.send(Message.serialize(notice));
291
292         } catch (IOException ex) {
293           System.out.println("Error in publishing");
294           ex.printStackTrace();
295         }
296         System.out.println(Thread.currentThread().getName() + "-Published message[" + notice
297             + "]");
298
299       }
300     };
301     handlersPool.execute(task);
302   }
303
304   /**
305    * Finds IPv4 address of the local VM
306    * TODO: This method is non-deterministic. There may be more than one IPv4 address. Cant say which
307    * address will be returned. Read IP from a property file or enhance the code to make it deterministic.
308    * Should we use IP or hostname?
309    *
310    * @return
311    */
312   private String getLocalIpAddress() {
313     String hostAddress = null;
314     Enumeration e = null;
315     try {
316       e = NetworkInterface.getNetworkInterfaces();
317     } catch (SocketException e1) {
318       e1.printStackTrace();
319     }
320     while (e.hasMoreElements()) {
321
322       NetworkInterface n = (NetworkInterface) e.nextElement();
323
324       Enumeration ee = n.getInetAddresses();
325       while (ee.hasMoreElements()) {
326         InetAddress i = (InetAddress) ee.nextElement();
327         if ((i instanceof Inet4Address) && (i.isSiteLocalAddress()))
328           hostAddress = i.getHostAddress();
329       }
330     }
331     return hostAddress;
332
333   }
334
335   /**
336    * TODO: Change to use external routing table implementation
337    *
338    * @param msg
339    */
340   private void updateRoutingTable(Message msg) {
341     routingTable.put(msg.getRoute(), msg.getSender());
342     RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> route = msg.getRoute();
343
344     // Currently only registers rpc implementation.
345     // TODO: do registration for instance based routing
346     QName rpcType = route.getType();
347     RpcRegistration registration = brokerSession.addRpcImplementation(rpcType, facade);
348   }
349
350   /**
351    * @param socket
352    * @return
353    */
354   private Message parseMessage(ZMQ.Socket socket) {
355
356     Message msg = null;
357     try {
358       byte[] bytes = socket.recv();
359       System.out.println("Received bytes:[" + bytes.length + "]");
360       msg = (Message) Message.deserialize(bytes);
361     } catch (Throwable t) {
362       System.out.println("Caught exception");
363       t.printStackTrace();
364     }
365     return msg;
366   }
367
368   private class RpcFacade implements RpcImplementation {
369
370     @Override
371     public Set<QName> getSupportedRpcs() {
372       return Collections.emptySet();
373     }
374
375     @Override
376     public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
377
378       RouteIdentifierImpl routeId = new RouteIdentifierImpl();
379       routeId.setType(rpc);
380
381       RpcRequestImpl request = new RpcRequestImpl();
382       request.setRouteIdentifier(routeId);
383       request.setPayload(input);
384
385       final Future<RpcReply<Object>> ret = sendRpc(request);
386
387       //TODO: Review result handling
388       RpcResult<CompositeNode> result = new RpcResult<CompositeNode>() {
389         @Override
390         public boolean isSuccessful() {
391           try {
392             ret.get();
393           } catch (InterruptedException | ExecutionException e) {
394             e.printStackTrace();
395             return false;
396           }
397           return true;
398         }
399
400         @Override
401         public CompositeNode getResult() {
402           return null;
403         }
404
405         @Override
406         public Collection<RpcError> getErrors() {
407           return Collections.EMPTY_LIST;
408         }
409       };
410       return result;
411     }
412   }
413
414   /**
415    * Listener for rpc registrations
416    */
417   private class RpcListener implements RpcRegistrationListener {
418
419     @Override
420     public void onRpcImplementationAdded(QName name) {
421       System.out.println("In ZeroMQ Rpc Listener onRpcImplementationAdded()");
422
423       RouteIdentifierImpl routeId = new RouteIdentifierImpl();
424       routeId.setType(name);
425
426       //TODO: Make notice immutable and change message type
427       Message notice = new Message.MessageBuilder()
428           .type(MessageType.ANNOUNCE)
429           .sender("tcp://" + localIp + ":" + rpcPort)
430           .route(routeId)
431           .build();
432
433       publish(notice);
434     }
435
436     @Override
437     public void onRpcImplementationRemoved(QName name) {
438       // TODO: send a rpc-deregistrtation notice
439
440     }
441   }
442
443   public void setBrokerSession(ProviderSession session) {
444     this.brokerSession = session;
445
446   }
447
448 }