Merge "Fixed sal-netconf-connector readConfiguration"
[controller.git] / opendaylight / md-sal / sal-zeromq-connector / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / router / zeromq / ZeroMqRpcRouter.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq;
9
10 import java.io.IOException;
11 import java.net.Inet4Address;
12 import java.net.InetAddress;
13 import java.net.NetworkInterface;
14 import java.net.SocketException;
15 import java.util.Collection;
16 import java.util.Collections;
17 import java.util.Enumeration;
18 import java.util.Map;
19 import java.util.Set;
20 import java.util.concurrent.Callable;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.Future;
26
27 import org.opendaylight.controller.sal.connector.api.RpcRouter;
28 import org.opendaylight.controller.sal.connector.remoterpc.router.zeromq.Message.MessageType;
29 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
30 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
31 import org.opendaylight.controller.sal.core.api.RpcImplementation;
32 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
33 import org.opendaylight.yangtools.yang.common.QName;
34 import org.opendaylight.yangtools.yang.common.RpcError;
35 import org.opendaylight.yangtools.yang.common.RpcResult;
36 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
37 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40 import org.zeromq.ZMQ;
41
42 /**
43  * ZeroMq based implementation of RpcRouter
44  * TODO:
45  *    1. Make it multi VM aware
46  *    2. Make rpc request handling async and non-blocking. Note zmq socket is not thread safe
47  *    3. sendRpc() should use connection pooling
48  *    4. Read properties from config file using existing(?) ODL properties framework
49  */
50 public class ZeroMqRpcRouter implements RpcRouter<QName, QName, InstanceIdentifier, Object> {
51
52   private ExecutorService serverPool;
53   private static ExecutorService handlersPool;
54
55   private Map<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String> routingTable;
56
57   private ProviderSession brokerSession;
58
59   private ZMQ.Context context;
60   private ZMQ.Socket publisher;
61   private ZMQ.Socket subscriber;
62   private ZMQ.Socket replySocket;
63
64   private static ZeroMqRpcRouter _instance = new ZeroMqRpcRouter();
65
66   private final RpcFacade facade = new RpcFacade();
67   private final RpcListener listener = new RpcListener();
68
69   private final String localIp = getLocalIpAddress();
70
71   private String pubPort = System.getProperty("pub.port");// port on which announcements are sent
72   private String subPort = System.getProperty("sub.port");// other controller's pub port
73   private String pubIp = System.getProperty("pub.ip"); // other controller's ip
74   private String rpcPort = System.getProperty("rpc.port");// port on which RPC messages are received
75
76   private Logger _logger = LoggerFactory.getLogger(ZeroMqRpcRouter.class);
77
78   //Prevent instantiation
79   private ZeroMqRpcRouter() {
80   }
81
82   public static ZeroMqRpcRouter getInstance() {
83     return _instance;
84   }
85
86   public void start() {
87     context = ZMQ.context(2);
88     publisher = context.socket(ZMQ.PUB);
89     int ret = publisher.bind("tcp://*:" + pubPort);
90     // serverPool = Executors.newSingleThreadExecutor();
91     serverPool = Executors.newCachedThreadPool();
92     handlersPool = Executors.newCachedThreadPool();
93     routingTable = new ConcurrentHashMap<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String>();
94
95     // Start listening for announce and rpc messages
96     serverPool.execute(receive());
97
98     brokerSession.addRpcRegistrationListener(listener);
99
100     Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
101     for (QName rpc : currentlySupported) {
102       listener.onRpcImplementationAdded(rpc);
103     }
104
105   }
106
107   public void stop() {
108     if (handlersPool != null)
109       handlersPool.shutdown();
110     if (serverPool != null)
111       serverPool.shutdown();
112     if (publisher != null) {
113       publisher.setLinger(0);
114       publisher.close();
115     }
116     if (replySocket != null) {
117       replySocket.setLinger(0);
118       replySocket.close();
119     }
120     if (subscriber != null) {
121       subscriber.setLinger(0);
122       subscriber.close();
123     }
124     if (context != null)
125       context.term();
126
127   }
128
129   private Runnable receive() {
130     return new Runnable() {
131       public void run() {
132         try {
133           // Bind to RPC reply socket
134           replySocket = context.socket(ZMQ.REP);
135           replySocket.bind("tcp://*:" + rpcPort);
136
137           // Bind to publishing controller
138           subscriber = context.socket(ZMQ.SUB);
139           String pubAddress = "tcp://" + pubIp + ":" + subPort;
140           subscriber.connect(pubAddress);
141           _logger.debug("{} Subscribing at[{}]", Thread.currentThread().getName(), pubAddress);
142
143           //subscribe for announcements
144           //TODO: Message type would be changed. Update this
145           subscriber.subscribe(Message.serialize(Message.MessageType.ANNOUNCE));
146
147           // Poller enables listening on multiple sockets using a single thread
148           ZMQ.Poller poller = new ZMQ.Poller(2);
149           poller.register(replySocket, ZMQ.Poller.POLLIN);
150           poller.register(subscriber, ZMQ.Poller.POLLIN);
151
152           //TODO: Add code to restart the thread after exception
153           while (!Thread.currentThread().isInterrupted()) {
154
155             poller.poll();
156
157             if (poller.pollin(0)) {
158               handleRpcCall();
159             }
160             if (poller.pollin(1)) {
161               handleAnnouncement();
162             }
163           }
164         } catch (Exception e) {
165           e.printStackTrace();
166         }
167         replySocket.setLinger(0);
168         replySocket.close();
169         subscriber.setLinger(0);
170         subscriber.close();
171       }
172     };
173   }
174
175   /**
176    * @throws IOException
177    * @throws ClassNotFoundException
178    */
179   private void handleAnnouncement() throws IOException, ClassNotFoundException {
180
181     _logger.info("Announcement received");
182     Message.MessageType topic = (MessageType) Message.deserialize(subscriber.recv());
183
184     if (subscriber.hasReceiveMore()) {
185       try {
186         Message m = (Message) Message.deserialize(subscriber.recv());
187         _logger.debug("Announcement message [{}]", m);
188
189         // TODO: check on msg type or topic. Both
190         // should be same. Need to normalize.
191         if (Message.MessageType.ANNOUNCE == m.getType())
192           updateRoutingTable(m);
193       } catch (IOException | ClassNotFoundException e) {
194         e.printStackTrace();
195       }
196     }
197
198   }
199
200   /**
201    * @throws InterruptedException
202    * @throws ExecutionException
203    */
204   private void handleRpcCall() throws InterruptedException, ExecutionException {
205     try {
206       Message request = parseMessage(replySocket);
207
208       _logger.debug("Received rpc request [{}]", request);
209
210       // Call broker to process the message then reply
211       Future<RpcResult<CompositeNode>> rpc = brokerSession.rpc(
212           (QName) request.getRoute().getType(), (CompositeNode) request.getPayload());
213
214       RpcResult<CompositeNode> result = rpc.get();
215
216       Message response = new Message.MessageBuilder()
217           .type(MessageType.RESPONSE)
218           .sender(localIp + ":" + rpcPort)
219           .route(request.getRoute())
220           //.payload(result)    TODO: enable and test
221           .build();
222
223       replySocket.send(Message.serialize(response));
224
225       _logger.debug("Sent rpc response [{}]", response);
226
227     } catch (IOException ex) {
228       //TODO: handle exception and send error codes to caller
229       ex.printStackTrace();
230     }
231   }
232
233
234   @Override
235   public Future<RpcReply<Object>> sendRpc(
236       final RpcRequest<QName, QName, InstanceIdentifier, Object> input) {
237
238     return handlersPool.submit(new Callable<RpcReply<Object>>() {
239
240       @Override
241       public RpcReply<Object> call() {
242         ZMQ.Socket requestSocket = context.socket(ZMQ.REQ);
243
244         // TODO pick the ip and port from routing table based on routing identifier
245         requestSocket.connect("tcp://" + pubIp + ":5554");
246
247         Message requestMessage = new Message.MessageBuilder()
248             .type(MessageType.REQUEST)
249             .sender(localIp + ":" + rpcPort)
250             .route(input.getRoutingInformation())
251             .payload(input.getPayload())
252             .build();
253
254         _logger.debug("Sending rpc request [{}]", requestMessage);
255
256         RpcReply<Object> reply = null;
257
258         try {
259
260           requestSocket.send(Message.serialize(requestMessage));
261           final Message response = parseMessage(requestSocket);
262
263           _logger.debug("Received response [{}]", response);
264
265           reply = new RpcReply<Object>() {
266
267             @Override
268             public Object getPayload() {
269               return response.getPayload();
270             }
271           };
272         } catch (IOException ex) {
273           // TODO: Pass exception back to the caller
274           ex.printStackTrace();
275         }
276
277         return reply;
278       }
279     });
280   }
281
282   /**
283    * TODO: Remove this implementation and use RoutingTable implementation to send announcements
284    * Publishes a notice to other controllers in the cluster
285    *
286    * @param notice
287    */
288   public void publish(final Message notice) {
289     Runnable task = new Runnable() {
290       public void run() {
291
292         try {
293
294           publisher.sendMore(Message.serialize(Message.MessageType.ANNOUNCE));
295           publisher.send(Message.serialize(notice));
296           _logger.debug("Announcement sent [{}]", notice);
297         } catch (IOException ex) {
298           _logger.error("Error in sending announcement [{}]", notice);
299           ex.printStackTrace();
300         }
301       }
302     };
303     handlersPool.execute(task);
304   }
305
306   /**
307    * Finds IPv4 address of the local VM
308    * TODO: This method is non-deterministic. There may be more than one IPv4 address. Cant say which
309    * address will be returned. Read IP from a property file or enhance the code to make it deterministic.
310    * Should we use IP or hostname?
311    *
312    * @return
313    */
314   private String getLocalIpAddress() {
315     String hostAddress = null;
316     Enumeration e = null;
317     try {
318       e = NetworkInterface.getNetworkInterfaces();
319     } catch (SocketException e1) {
320       e1.printStackTrace();
321     }
322     while (e.hasMoreElements()) {
323
324       NetworkInterface n = (NetworkInterface) e.nextElement();
325
326       Enumeration ee = n.getInetAddresses();
327       while (ee.hasMoreElements()) {
328         InetAddress i = (InetAddress) ee.nextElement();
329         if ((i instanceof Inet4Address) && (i.isSiteLocalAddress()))
330           hostAddress = i.getHostAddress();
331       }
332     }
333     return hostAddress;
334
335   }
336
337   /**
338    * TODO: Change to use external routing table implementation
339    *
340    * @param msg
341    */
342   private void updateRoutingTable(Message msg) {
343     routingTable.put(msg.getRoute(), msg.getSender());
344     RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> route = msg.getRoute();
345
346     // Currently only registers rpc implementation.
347     // TODO: do registration for instance based routing
348     QName rpcType = route.getType();
349     RpcRegistration registration = brokerSession.addRpcImplementation(rpcType, facade);
350     _logger.debug("Routing table updated");
351   }
352
353   /**
354    * @param socket
355    * @return
356    */
357   private Message parseMessage(ZMQ.Socket socket) {
358
359     Message msg = null;
360     try {
361       byte[] bytes = socket.recv();
362       _logger.debug("Received bytes:[{}]", bytes.length);
363       msg = (Message) Message.deserialize(bytes);
364     } catch (Throwable t) {
365       t.printStackTrace();
366     }
367     return msg;
368   }
369
370   private class RpcFacade implements RpcImplementation {
371
372     @Override
373     public Set<QName> getSupportedRpcs() {
374       return Collections.emptySet();
375     }
376
377     @Override
378     public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
379
380       RouteIdentifierImpl routeId = new RouteIdentifierImpl();
381       routeId.setType(rpc);
382
383       RpcRequestImpl request = new RpcRequestImpl();
384       request.setRouteIdentifier(routeId);
385       request.setPayload(input);
386
387       final Future<RpcReply<Object>> ret = sendRpc(request);
388
389       //TODO: Review result handling
390       RpcResult<CompositeNode> result = new RpcResult<CompositeNode>() {
391         @Override
392         public boolean isSuccessful() {
393           try {
394             ret.get();
395           } catch (InterruptedException | ExecutionException e) {
396             e.printStackTrace();
397             return false;
398           }
399           return true;
400         }
401
402         @Override
403         public CompositeNode getResult() {
404           return null;
405         }
406
407         @Override
408         public Collection<RpcError> getErrors() {
409           return Collections.EMPTY_LIST;
410         }
411       };
412       return result;
413     }
414   }
415
416   /**
417    * Listener for rpc registrations
418    */
419   private class RpcListener implements RpcRegistrationListener {
420
421     @Override
422     public void onRpcImplementationAdded(QName name) {
423
424       _logger.debug("Announcing registration for [{}]", name);
425       RouteIdentifierImpl routeId = new RouteIdentifierImpl();
426       routeId.setType(name);
427
428       //TODO: Make notice immutable and change message type
429       Message notice = new Message.MessageBuilder()
430           .type(MessageType.ANNOUNCE)
431           .sender("tcp://" + localIp + ":" + rpcPort)
432           .route(routeId)
433           .build();
434
435       publish(notice);
436     }
437
438     @Override
439     public void onRpcImplementationRemoved(QName name) {
440       // TODO: send a rpc-deregistrtation notice
441
442     }
443   }
444
445   public void setBrokerSession(ProviderSession session) {
446     this.brokerSession = session;
447
448   }
449
450 }