ClusteredDataStoreManager unit tests
[controller.git] / opendaylight / md-sal / sal-zeromq-connector / src / main / java / org / opendaylight / controller / sal / connector / zeromq / ZeroMqRpcRouter.java
1 package org.opendaylight.controller.sal.connector.zeromq;
2
3 import org.opendaylight.controller.sal.connector.api.RpcRouter;
4 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
5 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
6 import org.opendaylight.controller.sal.core.api.RpcImplementation;
7 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
8 import org.opendaylight.yangtools.yang.common.QName;
9 import org.opendaylight.yangtools.yang.common.RpcResult;
10 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
11 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
12 import org.zeromq.ZMQ;
13
14 import java.io.IOException;
15 import java.net.Inet4Address;
16 import java.net.InetAddress;
17 import java.net.NetworkInterface;
18 import java.net.SocketException;
19 import java.util.*;
20 import java.util.concurrent.*;
21
22 public class ZeroMqRpcRouter implements RpcRouter<QName, QName, InstanceIdentifier, Object> {
23
24   private ExecutorService serverPool;
25   private static ExecutorService handlersPool;
26
27   private Map<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String> routingTable;
28
29   private ProviderSession brokerSession;
30
31   private ZMQ.Context context;
32   private ZMQ.Socket publisher;
33   private ZMQ.Socket subscriber;
34   private ZMQ.Socket replySocket;
35
36   private static ZeroMqRpcRouter _instance = new ZeroMqRpcRouter();
37
38   private final RpcFacade facade = new RpcFacade();
39   private final RpcListener listener = new RpcListener();
40
41   private String pubPort = System.getProperty("pub.port");//port on which announcements are sent
42   private String subPort = System.getProperty("sub.port");//other controller's pub port
43   private String pubIp = System.getProperty("pub.ip");    //other controller's ip
44   private String rpcPort = System.getProperty("rpc.port");//port on which RPC messages are received
45
46
47   private ZeroMqRpcRouter() {
48   }
49
50   public static ZeroMqRpcRouter getInstance() {
51     return _instance;
52   }
53
54   public void start() {
55     context = ZMQ.context(2);
56     serverPool = Executors.newSingleThreadExecutor();
57     handlersPool = Executors.newCachedThreadPool();
58     routingTable = new ConcurrentHashMap<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String>();
59
60     // Start listening for announce and rpc messages
61     serverPool.execute(receive());
62
63     
64     brokerSession.addRpcRegistrationListener(listener);
65     Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
66     for(QName rpc : currentlySupported) {
67         listener.onRpcImplementationAdded(rpc);
68     }
69
70
71   }
72
73   public void stop() {
74     if (handlersPool != null) handlersPool.shutdown();
75     if (serverPool != null) serverPool.shutdown();
76     if (publisher != null) {
77       publisher.setLinger(0);
78       publisher.close();
79     }
80     if (replySocket != null) {
81       replySocket.setLinger(0);
82       replySocket.close();
83     }
84     if (subscriber != null) {
85       subscriber.setLinger(0);
86       subscriber.close();
87     }
88     if (context != null) context.term();
89
90
91   }
92
93   private Runnable receive() {
94     return new Runnable() {
95       public void run() {
96         try {
97           // Bind to RPC reply socket
98           replySocket = context.socket(ZMQ.REP);
99           replySocket.bind("tcp://*:" + rpcPort);
100
101           // Bind to publishing controller
102           subscriber = context.socket(ZMQ.SUB);
103           subscriber.connect("tcp://" + pubIp + ":" + subPort);
104           System.out.println("Subscribing at[" + "tcp://" + pubIp + ":" + subPort + "]");
105
106           subscriber.subscribe(Message.serialize(Message.MessageType.ANNOUNCE));
107
108           // Initialize poll set
109           ZMQ.Poller poller = new ZMQ.Poller(2);
110           poller.register(replySocket, ZMQ.Poller.POLLIN);
111           poller.register(subscriber, ZMQ.Poller.POLLIN);
112
113           while (!Thread.currentThread().isInterrupted()) {
114
115             poller.poll(250);
116             //TODO: Fix this
117             if (poller.pollin(0)) {
118               //receive rpc request and reply
119               try {
120                 Message req = parseMessage(replySocket);
121                 Message resp = new Message();
122                 //Call broker to process the message then reply
123                 Future<RpcResult<CompositeNode>> rpc = brokerSession.rpc((QName) req.getRoute().getType(), (CompositeNode) req.getPayload());
124                 RpcResult<CompositeNode> result = rpc.get();
125                 resp.setType(Message.MessageType.RESPONSE);
126                 resp.setSender(getLocalIpAddress() + ":" + rpcPort);
127                 resp.setRoute(req.getRoute());
128                 resp.setPayload(result.isSuccessful());
129                 replySocket.send(Message.serialize(resp));
130
131               } catch (IOException ex) {// | ClassNotFoundException ex) {
132                 System.out.println("Rpc request could not be handled" + ex);
133               }
134             }
135             if (poller.pollin(1)) {
136               //get subscription and update routing table
137               //try {
138               Message.MessageType topic = (Message.MessageType)Message.deserialize(subscriber.recv());
139               System.out.println("Topic:[" + topic + "]");
140
141               if (subscriber.hasReceiveMore()) {
142                 try {
143                   Message m = (Message) Message.deserialize(subscriber.recv());
144                   System.out.println(m);
145                   //TODO: check on msg type or topic. Both should be same. Need to normalize.
146                   if (Message.MessageType.ANNOUNCE == m.getType()) updateRoutingTable(m);
147                 } catch (IOException | ClassNotFoundException e) {
148                   e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
149                 }
150               }
151 //
152             }
153           }
154         } catch (Exception e) {
155           e.printStackTrace();
156         }
157         replySocket.setLinger(0);
158         replySocket.close();
159         subscriber.setLinger(0);
160         subscriber.close();
161       }
162     };
163   }
164
165   private void updateRoutingTable(Message msg) {
166     routingTable.put(msg.getRoute(), msg.getSender());
167     RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> route = msg.getRoute();
168     QName rpcType = route.getType();
169     System.out.println("Routing Table\n" + routingTable);
170
171     RpcRegistration registration = brokerSession.addRpcImplementation(rpcType, facade);
172   }
173
174   private Message parseMessage(ZMQ.Socket socket) {
175     //Message m = new Message();
176     //socket.setReceiveBufferSize(40000);
177     Message msg = null;
178     try {
179       byte[] bytes = socket.recv();
180       System.out.println("Received bytes:[" + bytes.length + "]");
181       msg = (Message) Message.deserialize(bytes);
182     } catch (Throwable t) {
183       System.out.println("Caught exception");
184       t.printStackTrace();
185     }
186     return msg;
187     /*m.setType((Message.MessageType) Message.deserialize(socket.recv()));
188
189     if (socket.hasReceiveMore()) {
190       m.setSender((String) Message.deserialize(socket.recv()));
191     }
192     if (socket.hasReceiveMore()) {
193       m.setRoute((RouteIdentifier) Message.deserialize(socket.recv()));
194     }
195     if (socket.hasReceiveMore()) {
196       m.setPayload(Message.deserialize(socket.recv()));
197     }
198     return m;*/
199   }
200
201   @Override
202   public Future<RpcReply<Object>> sendRpc(final RpcRequest<QName, QName, InstanceIdentifier, Object> input) {
203
204     return handlersPool.submit(new Callable<RpcReply<Object>>() {
205
206       @Override
207       public RpcReply<Object> call() {
208         ZMQ.Socket requestSocket = context.socket(ZMQ.REQ);
209         Message req = new Message();
210         Message resp = null;
211         RpcReplyImpl reply = new RpcReplyImpl();
212         requestSocket.connect((String) routingTable.get(input.getRoutingInformation().getRoute()));
213
214         req.setType(Message.MessageType.REQUEST);
215         req.setSender(getLocalIpAddress() + ":" + rpcPort);
216         req.setRoute(input.getRoutingInformation());
217         req.setPayload(input.getPayload());
218         try {
219           requestSocket.send(Message.serialize(req));
220           resp = parseMessage(requestSocket);
221           reply.setPayload(resp.getPayload());
222         } catch (IOException ex) {//| ClassNotFoundException ex) {
223           //Log and ignore
224           System.out.println("Error in RPC send. Input could not be serialized[" + input + "]");
225         }
226
227         return reply;
228       }
229     });
230   }
231
232   public void publish(final Message message) {
233     Runnable task = new Runnable() {
234       public void run() {
235         // Bind to publishing port
236         publisher = context.socket(ZMQ.PUB);
237         publisher.bind("tcp://*:" + pubPort);
238         System.out.println("Publisher started at port[" + pubPort + "]");
239         try {
240           Message outMessage =  new Message();
241           outMessage.setType(Message.MessageType.ANNOUNCE);
242           outMessage.setSender("tcp://" + getLocalIpAddress() + ":" + rpcPort);
243           outMessage.setRoute(message.getRoute());
244
245           System.out.println("Sending announcement[" + outMessage + "]");
246           publisher.sendMore(Message.serialize(Message.MessageType.ANNOUNCE));
247           publisher.send(Message.serialize(outMessage));
248
249         } catch (IOException ex) {
250           //Log and ignore
251           System.out.println("Error in publishing");
252           ex.printStackTrace();
253         }
254         System.out.println("Published message[" + message + "]");
255         publisher.close();
256       }
257     };
258     handlersPool.execute(task);
259   }
260
261   private String getLocalIpAddress() {
262     String hostAddress = null;
263     Enumeration e = null;
264     try {
265       e = NetworkInterface.getNetworkInterfaces();
266     } catch (SocketException e1) {
267       e1.printStackTrace();
268     }
269     while (e.hasMoreElements()) {
270
271       NetworkInterface n = (NetworkInterface) e.nextElement();
272       Enumeration ee = n.getInetAddresses();
273       while (ee.hasMoreElements()) {
274         InetAddress i = (InetAddress) ee.nextElement();
275         if ((i instanceof Inet4Address) && (i.isSiteLocalAddress()))
276           hostAddress = i.getHostAddress();
277       }
278     }
279
280     return hostAddress;
281
282
283   }
284
285
286   private class RpcFacade implements RpcImplementation {
287
288
289     @Override
290     public Set<QName> getSupportedRpcs() {
291       return Collections.emptySet();
292     }
293
294     @Override
295     public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
296
297       RpcRequestImpl request = new RpcRequestImpl();
298       RouteIdentifierImpl routeId = new RouteIdentifierImpl();
299       routeId.setContext(null);
300       routeId.setRoute(null);
301       routeId.setType(rpc);
302
303       request.setRouteIdentifier(routeId);
304       request.setPayload(input);
305       // Create message
306
307       Future<org.opendaylight.controller.sal.connector.api.RpcRouter.RpcReply<Object>> ret = sendRpc(request);
308
309       return null;
310     }
311   }
312
313   private class RpcListener implements RpcRegistrationListener {
314
315     @Override
316     public void onRpcImplementationAdded(QName name) {
317
318       Message msg = new Message();
319       RouteIdentifierImpl routeId = new RouteIdentifierImpl();
320       routeId.setType(name);
321       msg.setRoute(routeId);
322       publish(msg);
323     }
324
325     @Override
326     public void onRpcImplementationRemoved(QName name) {
327       // TODO Auto-generated method stub
328
329     }
330   }
331
332   public void setBrokerSession(ProviderSession session) {
333     this.brokerSession = session;
334
335   }
336
337 }