Adding copyright notice
[controller.git] / opendaylight / md-sal / sal-zeromq-connector / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / router / zeromq / ZeroMqRpcRouter.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq;
9
10 import java.io.IOException;
11 import java.net.Inet4Address;
12 import java.net.InetAddress;
13 import java.net.NetworkInterface;
14 import java.net.SocketException;
15 import java.util.Collection;
16 import java.util.Collections;
17 import java.util.Enumeration;
18 import java.util.Map;
19 import java.util.Set;
20 import java.util.concurrent.Callable;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.Future;
26
27 import org.opendaylight.controller.sal.connector.api.RpcRouter;
28 import org.opendaylight.controller.sal.connector.remoterpc.router.zeromq.Message.MessageType;
29 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
30 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
31 import org.opendaylight.controller.sal.core.api.RpcImplementation;
32 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
33 import org.opendaylight.yangtools.yang.common.QName;
34 import org.opendaylight.yangtools.yang.common.RpcError;
35 import org.opendaylight.yangtools.yang.common.RpcResult;
36 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
37 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
38 import org.zeromq.ZMQ;
39
40 /**
41  * ZeroMq based implementation of RpcRouter
42  * TODO:
43  *    1. Make it multi VM aware
44  *    2. Make rpc request handling async and non-blocking. Note zmq socket is not thread safe
45  *    3. sendRpc() should use connection pooling
46  *    4. Read properties from config file using existing(?) ODL properties framework
47  */
48 public class ZeroMqRpcRouter implements RpcRouter<QName, QName, InstanceIdentifier, Object> {
49
50   private ExecutorService serverPool;
51   private static ExecutorService handlersPool;
52
53   private Map<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String> routingTable;
54
55   private ProviderSession brokerSession;
56
57   private ZMQ.Context context;
58   private ZMQ.Socket publisher;
59   private ZMQ.Socket subscriber;
60   private ZMQ.Socket replySocket;
61
62   private static ZeroMqRpcRouter _instance = new ZeroMqRpcRouter();
63
64   private final RpcFacade facade = new RpcFacade();
65   private final RpcListener listener = new RpcListener();
66
67   private final String localIp = getLocalIpAddress();
68
69   private String pubPort = System.getProperty("pub.port");// port on which announcements are sent
70   private String subPort = System.getProperty("sub.port");// other controller's pub port
71   private String pubIp = System.getProperty("pub.ip"); // other controller's ip
72   private String rpcPort = System.getProperty("rpc.port");// port on which RPC messages are received
73
74   //Prevent instantiation
75   private ZeroMqRpcRouter() {
76   }
77
78   public static ZeroMqRpcRouter getInstance() {
79     return _instance;
80   }
81
82   public void start() {
83     context = ZMQ.context(2);
84     publisher = context.socket(ZMQ.PUB);
85     int ret = publisher.bind("tcp://*:" + pubPort);
86     System.out.println(Thread.currentThread().getName() + " Return(publish bind) :[" + ret + "]");
87     // serverPool = Executors.newSingleThreadExecutor();
88     serverPool = Executors.newCachedThreadPool();
89     handlersPool = Executors.newCachedThreadPool();
90     routingTable = new ConcurrentHashMap<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String>();
91
92     // Start listening for announce and rpc messages
93     serverPool.execute(receive());
94
95     brokerSession.addRpcRegistrationListener(listener);
96
97     Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
98     for (QName rpc : currentlySupported) {
99       listener.onRpcImplementationAdded(rpc);
100     }
101
102   }
103
104   public void stop() {
105     if (handlersPool != null)
106       handlersPool.shutdown();
107     if (serverPool != null)
108       serverPool.shutdown();
109     if (publisher != null) {
110       publisher.setLinger(0);
111       publisher.close();
112     }
113     if (replySocket != null) {
114       replySocket.setLinger(0);
115       replySocket.close();
116     }
117     if (subscriber != null) {
118       subscriber.setLinger(0);
119       subscriber.close();
120     }
121     if (context != null)
122       context.term();
123
124   }
125
126   private Runnable receive() {
127     return new Runnable() {
128       public void run() {
129         try {
130           // Bind to RPC reply socket
131           replySocket = context.socket(ZMQ.REP);
132           replySocket.bind("tcp://*:" + rpcPort);
133
134           // Bind to publishing controller
135           subscriber = context.socket(ZMQ.SUB);
136           subscriber.connect("tcp://" + pubIp + ":" + subPort);
137           System.out.println(Thread.currentThread().getName() + "-Subscribing at[" + "tcp://"
138               + pubIp + ":" + subPort + "]");
139
140           //subscribe for announcements
141           //TODO: Message type would be changed. Update this
142           subscriber.subscribe(Message.serialize(Message.MessageType.ANNOUNCE));
143
144           // Poller enables listening on multiple sockets using a single thread
145           ZMQ.Poller poller = new ZMQ.Poller(2);
146           poller.register(replySocket, ZMQ.Poller.POLLIN);
147           poller.register(subscriber, ZMQ.Poller.POLLIN);
148           System.out.println(Thread.currentThread().getName() + "-Start Polling");
149
150           //TODO: Add code to restart the thread after exception
151           while (!Thread.currentThread().isInterrupted()) {
152
153             poller.poll();
154
155             if (poller.pollin(0)) {
156               handleRpcCall();
157             }
158             if (poller.pollin(1)) {
159               handleAnnouncement();
160             }
161           }
162         } catch (Exception e) {
163           e.printStackTrace();
164         }
165         replySocket.setLinger(0);
166         replySocket.close();
167         subscriber.setLinger(0);
168         subscriber.close();
169       }
170     };
171   }
172
173   /**
174    * @throws IOException
175    * @throws ClassNotFoundException
176    */
177   private void handleAnnouncement() throws IOException, ClassNotFoundException {
178     System.out.println("\n" + Thread.currentThread().getName() + "-Received message");
179     Message.MessageType topic = (MessageType) Message.deserialize(subscriber.recv());
180     System.out.println("Topic:[" + topic + "]");
181
182     if (subscriber.hasReceiveMore()) {
183       try {
184         Message m = (Message) Message.deserialize(subscriber.recv());
185         System.out.println(m);
186         // TODO: check on msg type or topic. Both
187         // should be same. Need to normalize.
188         if (Message.MessageType.ANNOUNCE == m.getType())
189           updateRoutingTable(m);
190       } catch (IOException | ClassNotFoundException e) {
191         e.printStackTrace();
192       }
193     }
194
195   }
196
197   /**
198    * @throws InterruptedException
199    * @throws ExecutionException
200    */
201   private void handleRpcCall() throws InterruptedException, ExecutionException {
202     try {
203       Message req = parseMessage(replySocket);
204
205       System.out.println("Received RPC request [" + req + "]");
206
207       // Call broker to process the message then reply
208       Future<RpcResult<CompositeNode>> rpc = brokerSession.rpc(
209           (QName) req.getRoute().getType(), (CompositeNode) req.getPayload());
210
211       RpcResult<CompositeNode> result = rpc.get();
212
213       Message response = new Message.MessageBuilder()
214           .type(MessageType.RESPONSE)
215           .sender(localIp + ":" + rpcPort)
216           .route(req.getRoute())
217           //.payload(result)    TODO: enable and test
218           .build();
219
220       replySocket.send(Message.serialize(response));
221
222       System.out.println("Sent RPC response [" + response + "]");
223
224     } catch (IOException ex) {
225       //TODO: handle exception and send error codes to caller
226       System.out.println("Rpc request could not be handled" + ex);
227     }
228   }
229
230
231   @Override
232   public Future<RpcReply<Object>> sendRpc(
233       final RpcRequest<QName, QName, InstanceIdentifier, Object> input) {
234
235     return handlersPool.submit(new Callable<RpcReply<Object>>() {
236
237       @Override
238       public RpcReply<Object> call() {
239         ZMQ.Socket requestSocket = context.socket(ZMQ.REQ);
240
241         // TODO pick the ip and port from routing table based on routing identifier
242         requestSocket.connect("tcp://" + pubIp + ":5554");
243
244         Message requestMessage = new Message.MessageBuilder()
245             .type(MessageType.REQUEST)
246             .sender(localIp + ":" + rpcPort)
247             .route(input.getRoutingInformation())
248             .payload(input.getPayload())
249             .build();
250
251         RpcReply<Object> reply = null;
252
253         try {
254           System.out.println("\n\nRPC Request [" + requestMessage + "]");
255
256           requestSocket.send(Message.serialize(requestMessage));
257           final Message resp = parseMessage(requestSocket);
258
259           System.out.println("\n\nRPC Response [" + resp + "]");
260
261           reply = new RpcReply<Object>() {
262
263             @Override
264             public Object getPayload() {
265               return resp.getPayload();
266             }
267           };
268         } catch (IOException ex) {
269           // TODO: Pass exception back to the caller
270           System.out.println("Error in RPC send. Input could not be serialized[" + input + "]");
271         }
272
273         return reply;
274       }
275     });
276   }
277
278   /**
279    * TODO: Remove this implementation and use RoutingTable implementation to send announcements
280    * Publishes a notice to other controllers in the cluster
281    *
282    * @param notice
283    */
284   public void publish(final Message notice) {
285     Runnable task = new Runnable() {
286       public void run() {
287
288         System.out.println(
289             Thread.currentThread().getName() + "-Publisher started at port[" + pubPort + "]");
290
291         try {
292
293           System.out.println(
294               Thread.currentThread().getName() + "-Sending announcement[" + notice + "]");
295
296           publisher.sendMore(Message.serialize(Message.MessageType.ANNOUNCE));
297           publisher.send(Message.serialize(notice));
298
299         } catch (IOException ex) {
300           System.out.println("Error in publishing");
301           ex.printStackTrace();
302         }
303         System.out.println(Thread.currentThread().getName() + "-Published message[" + notice
304             + "]");
305
306       }
307     };
308     handlersPool.execute(task);
309   }
310
311   /**
312    * Finds IPv4 address of the local VM
313    * TODO: This method is non-deterministic. There may be more than one IPv4 address. Cant say which
314    * address will be returned. Read IP from a property file or enhance the code to make it deterministic.
315    * Should we use IP or hostname?
316    *
317    * @return
318    */
319   private String getLocalIpAddress() {
320     String hostAddress = null;
321     Enumeration e = null;
322     try {
323       e = NetworkInterface.getNetworkInterfaces();
324     } catch (SocketException e1) {
325       e1.printStackTrace();
326     }
327     while (e.hasMoreElements()) {
328
329       NetworkInterface n = (NetworkInterface) e.nextElement();
330
331       Enumeration ee = n.getInetAddresses();
332       while (ee.hasMoreElements()) {
333         InetAddress i = (InetAddress) ee.nextElement();
334         if ((i instanceof Inet4Address) && (i.isSiteLocalAddress()))
335           hostAddress = i.getHostAddress();
336       }
337     }
338     return hostAddress;
339
340   }
341
342   /**
343    * TODO: Change to use external routing table implementation
344    *
345    * @param msg
346    */
347   private void updateRoutingTable(Message msg) {
348     routingTable.put(msg.getRoute(), msg.getSender());
349     RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> route = msg.getRoute();
350
351     // Currently only registers rpc implementation.
352     // TODO: do registration for instance based routing
353     QName rpcType = route.getType();
354     RpcRegistration registration = brokerSession.addRpcImplementation(rpcType, facade);
355   }
356
357   /**
358    * @param socket
359    * @return
360    */
361   private Message parseMessage(ZMQ.Socket socket) {
362
363     Message msg = null;
364     try {
365       byte[] bytes = socket.recv();
366       System.out.println("Received bytes:[" + bytes.length + "]");
367       msg = (Message) Message.deserialize(bytes);
368     } catch (Throwable t) {
369       System.out.println("Caught exception");
370       t.printStackTrace();
371     }
372     return msg;
373   }
374
375   private class RpcFacade implements RpcImplementation {
376
377     @Override
378     public Set<QName> getSupportedRpcs() {
379       return Collections.emptySet();
380     }
381
382     @Override
383     public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
384
385       RouteIdentifierImpl routeId = new RouteIdentifierImpl();
386       routeId.setType(rpc);
387
388       RpcRequestImpl request = new RpcRequestImpl();
389       request.setRouteIdentifier(routeId);
390       request.setPayload(input);
391
392       final Future<RpcReply<Object>> ret = sendRpc(request);
393
394       //TODO: Review result handling
395       RpcResult<CompositeNode> result = new RpcResult<CompositeNode>() {
396         @Override
397         public boolean isSuccessful() {
398           try {
399             ret.get();
400           } catch (InterruptedException | ExecutionException e) {
401             e.printStackTrace();
402             return false;
403           }
404           return true;
405         }
406
407         @Override
408         public CompositeNode getResult() {
409           return null;
410         }
411
412         @Override
413         public Collection<RpcError> getErrors() {
414           return Collections.EMPTY_LIST;
415         }
416       };
417       return result;
418     }
419   }
420
421   /**
422    * Listener for rpc registrations
423    */
424   private class RpcListener implements RpcRegistrationListener {
425
426     @Override
427     public void onRpcImplementationAdded(QName name) {
428       System.out.println("In ZeroMQ Rpc Listener onRpcImplementationAdded()");
429
430       RouteIdentifierImpl routeId = new RouteIdentifierImpl();
431       routeId.setType(name);
432
433       //TODO: Make notice immutable and change message type
434       Message notice = new Message.MessageBuilder()
435           .type(MessageType.ANNOUNCE)
436           .sender("tcp://" + localIp + ":" + rpcPort)
437           .route(routeId)
438           .build();
439
440       publish(notice);
441     }
442
443     @Override
444     public void onRpcImplementationRemoved(QName name) {
445       // TODO: send a rpc-deregistrtation notice
446
447     }
448   }
449
450   public void setBrokerSession(ProviderSession session) {
451     this.brokerSession = session;
452
453   }
454
455 }