2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq;
10 import java.io.IOException;
11 import java.net.Inet4Address;
12 import java.net.InetAddress;
13 import java.net.NetworkInterface;
14 import java.net.SocketException;
15 import java.util.Collection;
16 import java.util.Collections;
17 import java.util.Enumeration;
20 import java.util.concurrent.Callable;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.Future;
27 import org.opendaylight.controller.sal.connector.api.RpcRouter;
28 import org.opendaylight.controller.sal.connector.remoterpc.router.zeromq.Message.MessageType;
29 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
30 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
31 import org.opendaylight.controller.sal.core.api.RpcImplementation;
32 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
33 import org.opendaylight.yangtools.yang.common.QName;
34 import org.opendaylight.yangtools.yang.common.RpcError;
35 import org.opendaylight.yangtools.yang.common.RpcResult;
36 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
37 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
38 import org.zeromq.ZMQ;
41 * ZeroMq based implementation of RpcRouter
43 * 1. Make it multi VM aware
44 * 2. Make rpc request handling async and non-blocking. Note zmq socket is not thread safe
45 * 3. sendRpc() should use connection pooling
46 * 4. Read properties from config file using existing(?) ODL properties framework
48 public class ZeroMqRpcRouter implements RpcRouter<QName, QName, InstanceIdentifier, Object> {
50 private ExecutorService serverPool;
51 private static ExecutorService handlersPool;
53 private Map<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String> routingTable;
55 private ProviderSession brokerSession;
57 private ZMQ.Context context;
58 private ZMQ.Socket publisher;
59 private ZMQ.Socket subscriber;
60 private ZMQ.Socket replySocket;
62 private static ZeroMqRpcRouter _instance = new ZeroMqRpcRouter();
64 private final RpcFacade facade = new RpcFacade();
65 private final RpcListener listener = new RpcListener();
67 private final String localIp = getLocalIpAddress();
69 private String pubPort = System.getProperty("pub.port");// port on which announcements are sent
70 private String subPort = System.getProperty("sub.port");// other controller's pub port
71 private String pubIp = System.getProperty("pub.ip"); // other controller's ip
72 private String rpcPort = System.getProperty("rpc.port");// port on which RPC messages are received
74 //Prevent instantiation
75 private ZeroMqRpcRouter() {
78 public static ZeroMqRpcRouter getInstance() {
83 context = ZMQ.context(2);
84 publisher = context.socket(ZMQ.PUB);
85 int ret = publisher.bind("tcp://*:" + pubPort);
86 System.out.println(Thread.currentThread().getName() + " Return(publish bind) :[" + ret + "]");
87 // serverPool = Executors.newSingleThreadExecutor();
88 serverPool = Executors.newCachedThreadPool();
89 handlersPool = Executors.newCachedThreadPool();
90 routingTable = new ConcurrentHashMap<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String>();
92 // Start listening for announce and rpc messages
93 serverPool.execute(receive());
95 brokerSession.addRpcRegistrationListener(listener);
97 Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
98 for (QName rpc : currentlySupported) {
99 listener.onRpcImplementationAdded(rpc);
105 if (handlersPool != null)
106 handlersPool.shutdown();
107 if (serverPool != null)
108 serverPool.shutdown();
109 if (publisher != null) {
110 publisher.setLinger(0);
113 if (replySocket != null) {
114 replySocket.setLinger(0);
117 if (subscriber != null) {
118 subscriber.setLinger(0);
126 private Runnable receive() {
127 return new Runnable() {
130 // Bind to RPC reply socket
131 replySocket = context.socket(ZMQ.REP);
132 replySocket.bind("tcp://*:" + rpcPort);
134 // Bind to publishing controller
135 subscriber = context.socket(ZMQ.SUB);
136 subscriber.connect("tcp://" + pubIp + ":" + subPort);
137 System.out.println(Thread.currentThread().getName() + "-Subscribing at[" + "tcp://"
138 + pubIp + ":" + subPort + "]");
140 //subscribe for announcements
141 //TODO: Message type would be changed. Update this
142 subscriber.subscribe(Message.serialize(Message.MessageType.ANNOUNCE));
144 // Poller enables listening on multiple sockets using a single thread
145 ZMQ.Poller poller = new ZMQ.Poller(2);
146 poller.register(replySocket, ZMQ.Poller.POLLIN);
147 poller.register(subscriber, ZMQ.Poller.POLLIN);
148 System.out.println(Thread.currentThread().getName() + "-Start Polling");
150 //TODO: Add code to restart the thread after exception
151 while (!Thread.currentThread().isInterrupted()) {
155 if (poller.pollin(0)) {
158 if (poller.pollin(1)) {
159 handleAnnouncement();
162 } catch (Exception e) {
165 replySocket.setLinger(0);
167 subscriber.setLinger(0);
174 * @throws IOException
175 * @throws ClassNotFoundException
177 private void handleAnnouncement() throws IOException, ClassNotFoundException {
178 System.out.println("\n" + Thread.currentThread().getName() + "-Received message");
179 Message.MessageType topic = (MessageType) Message.deserialize(subscriber.recv());
180 System.out.println("Topic:[" + topic + "]");
182 if (subscriber.hasReceiveMore()) {
184 Message m = (Message) Message.deserialize(subscriber.recv());
185 System.out.println(m);
186 // TODO: check on msg type or topic. Both
187 // should be same. Need to normalize.
188 if (Message.MessageType.ANNOUNCE == m.getType())
189 updateRoutingTable(m);
190 } catch (IOException | ClassNotFoundException e) {
198 * @throws InterruptedException
199 * @throws ExecutionException
201 private void handleRpcCall() throws InterruptedException, ExecutionException {
203 Message req = parseMessage(replySocket);
205 System.out.println("Received RPC request [" + req + "]");
207 // Call broker to process the message then reply
208 Future<RpcResult<CompositeNode>> rpc = brokerSession.rpc(
209 (QName) req.getRoute().getType(), (CompositeNode) req.getPayload());
211 RpcResult<CompositeNode> result = rpc.get();
213 Message response = new Message.MessageBuilder()
214 .type(MessageType.RESPONSE)
215 .sender(localIp + ":" + rpcPort)
216 .route(req.getRoute())
217 //.payload(result) TODO: enable and test
220 replySocket.send(Message.serialize(response));
222 System.out.println("Sent RPC response [" + response + "]");
224 } catch (IOException ex) {
225 //TODO: handle exception and send error codes to caller
226 System.out.println("Rpc request could not be handled" + ex);
232 public Future<RpcReply<Object>> sendRpc(
233 final RpcRequest<QName, QName, InstanceIdentifier, Object> input) {
235 return handlersPool.submit(new Callable<RpcReply<Object>>() {
238 public RpcReply<Object> call() {
239 ZMQ.Socket requestSocket = context.socket(ZMQ.REQ);
241 // TODO pick the ip and port from routing table based on routing identifier
242 requestSocket.connect("tcp://" + pubIp + ":5554");
244 Message requestMessage = new Message.MessageBuilder()
245 .type(MessageType.REQUEST)
246 .sender(localIp + ":" + rpcPort)
247 .route(input.getRoutingInformation())
248 .payload(input.getPayload())
251 RpcReply<Object> reply = null;
254 System.out.println("\n\nRPC Request [" + requestMessage + "]");
256 requestSocket.send(Message.serialize(requestMessage));
257 final Message resp = parseMessage(requestSocket);
259 System.out.println("\n\nRPC Response [" + resp + "]");
261 reply = new RpcReply<Object>() {
264 public Object getPayload() {
265 return resp.getPayload();
268 } catch (IOException ex) {
269 // TODO: Pass exception back to the caller
270 System.out.println("Error in RPC send. Input could not be serialized[" + input + "]");
279 * TODO: Remove this implementation and use RoutingTable implementation to send announcements
280 * Publishes a notice to other controllers in the cluster
284 public void publish(final Message notice) {
285 Runnable task = new Runnable() {
289 Thread.currentThread().getName() + "-Publisher started at port[" + pubPort + "]");
294 Thread.currentThread().getName() + "-Sending announcement[" + notice + "]");
296 publisher.sendMore(Message.serialize(Message.MessageType.ANNOUNCE));
297 publisher.send(Message.serialize(notice));
299 } catch (IOException ex) {
300 System.out.println("Error in publishing");
301 ex.printStackTrace();
303 System.out.println(Thread.currentThread().getName() + "-Published message[" + notice
308 handlersPool.execute(task);
312 * Finds IPv4 address of the local VM
313 * TODO: This method is non-deterministic. There may be more than one IPv4 address. Cant say which
314 * address will be returned. Read IP from a property file or enhance the code to make it deterministic.
315 * Should we use IP or hostname?
319 private String getLocalIpAddress() {
320 String hostAddress = null;
321 Enumeration e = null;
323 e = NetworkInterface.getNetworkInterfaces();
324 } catch (SocketException e1) {
325 e1.printStackTrace();
327 while (e.hasMoreElements()) {
329 NetworkInterface n = (NetworkInterface) e.nextElement();
331 Enumeration ee = n.getInetAddresses();
332 while (ee.hasMoreElements()) {
333 InetAddress i = (InetAddress) ee.nextElement();
334 if ((i instanceof Inet4Address) && (i.isSiteLocalAddress()))
335 hostAddress = i.getHostAddress();
343 * TODO: Change to use external routing table implementation
347 private void updateRoutingTable(Message msg) {
348 routingTable.put(msg.getRoute(), msg.getSender());
349 RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> route = msg.getRoute();
351 // Currently only registers rpc implementation.
352 // TODO: do registration for instance based routing
353 QName rpcType = route.getType();
354 RpcRegistration registration = brokerSession.addRpcImplementation(rpcType, facade);
361 private Message parseMessage(ZMQ.Socket socket) {
365 byte[] bytes = socket.recv();
366 System.out.println("Received bytes:[" + bytes.length + "]");
367 msg = (Message) Message.deserialize(bytes);
368 } catch (Throwable t) {
369 System.out.println("Caught exception");
375 private class RpcFacade implements RpcImplementation {
378 public Set<QName> getSupportedRpcs() {
379 return Collections.emptySet();
383 public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
385 RouteIdentifierImpl routeId = new RouteIdentifierImpl();
386 routeId.setType(rpc);
388 RpcRequestImpl request = new RpcRequestImpl();
389 request.setRouteIdentifier(routeId);
390 request.setPayload(input);
392 final Future<RpcReply<Object>> ret = sendRpc(request);
394 //TODO: Review result handling
395 RpcResult<CompositeNode> result = new RpcResult<CompositeNode>() {
397 public boolean isSuccessful() {
400 } catch (InterruptedException | ExecutionException e) {
408 public CompositeNode getResult() {
413 public Collection<RpcError> getErrors() {
414 return Collections.EMPTY_LIST;
422 * Listener for rpc registrations
424 private class RpcListener implements RpcRegistrationListener {
427 public void onRpcImplementationAdded(QName name) {
428 System.out.println("In ZeroMQ Rpc Listener onRpcImplementationAdded()");
430 RouteIdentifierImpl routeId = new RouteIdentifierImpl();
431 routeId.setType(name);
433 //TODO: Make notice immutable and change message type
434 Message notice = new Message.MessageBuilder()
435 .type(MessageType.ANNOUNCE)
436 .sender("tcp://" + localIp + ":" + rpcPort)
444 public void onRpcImplementationRemoved(QName name) {
445 // TODO: send a rpc-deregistrtation notice
450 public void setBrokerSession(ProviderSession session) {
451 this.brokerSession = session;