2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq;
10 import java.io.IOException;
11 import java.net.Inet4Address;
12 import java.net.InetAddress;
13 import java.net.NetworkInterface;
14 import java.net.SocketException;
15 import java.util.Collection;
16 import java.util.Collections;
17 import java.util.Enumeration;
20 import java.util.concurrent.Callable;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.Future;
27 import org.opendaylight.controller.sal.connector.api.RpcRouter;
28 import org.opendaylight.controller.sal.connector.remoterpc.router.zeromq.Message.MessageType;
29 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
30 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
31 import org.opendaylight.controller.sal.core.api.RpcImplementation;
32 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
33 import org.opendaylight.yangtools.yang.common.QName;
34 import org.opendaylight.yangtools.yang.common.RpcError;
35 import org.opendaylight.yangtools.yang.common.RpcResult;
36 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
37 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40 import org.zeromq.ZMQ;
43 * ZeroMq based implementation of RpcRouter
45 * 1. Make it multi VM aware
46 * 2. Make rpc request handling async and non-blocking. Note zmq socket is not thread safe
47 * 3. sendRpc() should use connection pooling
48 * 4. Read properties from config file using existing(?) ODL properties framework
50 public class ZeroMqRpcRouter implements RpcRouter<QName, QName, InstanceIdentifier, Object> {
52 private ExecutorService serverPool;
53 private static ExecutorService handlersPool;
55 private Map<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String> routingTable;
57 private ProviderSession brokerSession;
59 private ZMQ.Context context;
60 private ZMQ.Socket publisher;
61 private ZMQ.Socket subscriber;
62 private ZMQ.Socket replySocket;
64 private static ZeroMqRpcRouter _instance = new ZeroMqRpcRouter();
66 private final RpcFacade facade = new RpcFacade();
67 private final RpcListener listener = new RpcListener();
69 private final String localIp = getLocalIpAddress();
71 private String pubPort = System.getProperty("pub.port");// port on which announcements are sent
72 private String subPort = System.getProperty("sub.port");// other controller's pub port
73 private String pubIp = System.getProperty("pub.ip"); // other controller's ip
74 private String rpcPort = System.getProperty("rpc.port");// port on which RPC messages are received
76 private Logger _logger = LoggerFactory.getLogger(ZeroMqRpcRouter.class);
78 //Prevent instantiation
79 private ZeroMqRpcRouter() {
82 public static ZeroMqRpcRouter getInstance() {
87 context = ZMQ.context(2);
88 publisher = context.socket(ZMQ.PUB);
89 int ret = publisher.bind("tcp://*:" + pubPort);
90 // serverPool = Executors.newSingleThreadExecutor();
91 serverPool = Executors.newCachedThreadPool();
92 handlersPool = Executors.newCachedThreadPool();
93 routingTable = new ConcurrentHashMap<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String>();
95 // Start listening for announce and rpc messages
96 serverPool.execute(receive());
98 brokerSession.addRpcRegistrationListener(listener);
100 Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
101 for (QName rpc : currentlySupported) {
102 listener.onRpcImplementationAdded(rpc);
108 if (handlersPool != null)
109 handlersPool.shutdown();
110 if (serverPool != null)
111 serverPool.shutdown();
112 if (publisher != null) {
113 publisher.setLinger(0);
116 if (replySocket != null) {
117 replySocket.setLinger(0);
120 if (subscriber != null) {
121 subscriber.setLinger(0);
129 private Runnable receive() {
130 return new Runnable() {
133 // Bind to RPC reply socket
134 replySocket = context.socket(ZMQ.REP);
135 replySocket.bind("tcp://*:" + rpcPort);
137 // Bind to publishing controller
138 subscriber = context.socket(ZMQ.SUB);
139 String pubAddress = "tcp://" + pubIp + ":" + subPort;
140 subscriber.connect(pubAddress);
141 _logger.debug("{} Subscribing at[{}]", Thread.currentThread().getName(), pubAddress);
143 //subscribe for announcements
144 //TODO: Message type would be changed. Update this
145 subscriber.subscribe(Message.serialize(Message.MessageType.ANNOUNCE));
147 // Poller enables listening on multiple sockets using a single thread
148 ZMQ.Poller poller = new ZMQ.Poller(2);
149 poller.register(replySocket, ZMQ.Poller.POLLIN);
150 poller.register(subscriber, ZMQ.Poller.POLLIN);
152 //TODO: Add code to restart the thread after exception
153 while (!Thread.currentThread().isInterrupted()) {
157 if (poller.pollin(0)) {
160 if (poller.pollin(1)) {
161 handleAnnouncement();
164 } catch (Exception e) {
167 replySocket.setLinger(0);
169 subscriber.setLinger(0);
176 * @throws IOException
177 * @throws ClassNotFoundException
179 private void handleAnnouncement() throws IOException, ClassNotFoundException {
181 _logger.info("Announcement received");
182 Message.MessageType topic = (MessageType) Message.deserialize(subscriber.recv());
184 if (subscriber.hasReceiveMore()) {
186 Message m = (Message) Message.deserialize(subscriber.recv());
187 _logger.debug("Announcement message [{}]", m);
189 // TODO: check on msg type or topic. Both
190 // should be same. Need to normalize.
191 if (Message.MessageType.ANNOUNCE == m.getType())
192 updateRoutingTable(m);
193 } catch (IOException | ClassNotFoundException e) {
201 * @throws InterruptedException
202 * @throws ExecutionException
204 private void handleRpcCall() throws InterruptedException, ExecutionException {
206 Message request = parseMessage(replySocket);
208 _logger.debug("Received rpc request [{}]", request);
210 // Call broker to process the message then reply
211 Future<RpcResult<CompositeNode>> rpc = brokerSession.rpc(
212 (QName) request.getRoute().getType(), (CompositeNode) request.getPayload());
214 RpcResult<CompositeNode> result = rpc.get();
216 Message response = new Message.MessageBuilder()
217 .type(MessageType.RESPONSE)
218 .sender(localIp + ":" + rpcPort)
219 .route(request.getRoute())
220 //.payload(result) TODO: enable and test
223 replySocket.send(Message.serialize(response));
225 _logger.debug("Sent rpc response [{}]", response);
227 } catch (IOException ex) {
228 //TODO: handle exception and send error codes to caller
229 ex.printStackTrace();
235 public Future<RpcReply<Object>> sendRpc(
236 final RpcRequest<QName, QName, InstanceIdentifier, Object> input) {
238 return handlersPool.submit(new Callable<RpcReply<Object>>() {
241 public RpcReply<Object> call() {
242 ZMQ.Socket requestSocket = context.socket(ZMQ.REQ);
244 // TODO pick the ip and port from routing table based on routing identifier
245 requestSocket.connect("tcp://" + pubIp + ":5554");
247 Message requestMessage = new Message.MessageBuilder()
248 .type(MessageType.REQUEST)
249 .sender(localIp + ":" + rpcPort)
250 .route(input.getRoutingInformation())
251 .payload(input.getPayload())
254 _logger.debug("Sending rpc request [{}]", requestMessage);
256 RpcReply<Object> reply = null;
260 requestSocket.send(Message.serialize(requestMessage));
261 final Message response = parseMessage(requestSocket);
263 _logger.debug("Received response [{}]", response);
265 reply = new RpcReply<Object>() {
268 public Object getPayload() {
269 return response.getPayload();
272 } catch (IOException ex) {
273 // TODO: Pass exception back to the caller
274 ex.printStackTrace();
283 * TODO: Remove this implementation and use RoutingTable implementation to send announcements
284 * Publishes a notice to other controllers in the cluster
288 public void publish(final Message notice) {
289 Runnable task = new Runnable() {
294 publisher.sendMore(Message.serialize(Message.MessageType.ANNOUNCE));
295 publisher.send(Message.serialize(notice));
296 _logger.debug("Announcement sent [{}]", notice);
297 } catch (IOException ex) {
298 _logger.error("Error in sending announcement [{}]", notice);
299 ex.printStackTrace();
303 handlersPool.execute(task);
307 * Finds IPv4 address of the local VM
308 * TODO: This method is non-deterministic. There may be more than one IPv4 address. Cant say which
309 * address will be returned. Read IP from a property file or enhance the code to make it deterministic.
310 * Should we use IP or hostname?
314 private String getLocalIpAddress() {
315 String hostAddress = null;
316 Enumeration e = null;
318 e = NetworkInterface.getNetworkInterfaces();
319 } catch (SocketException e1) {
320 e1.printStackTrace();
322 while (e.hasMoreElements()) {
324 NetworkInterface n = (NetworkInterface) e.nextElement();
326 Enumeration ee = n.getInetAddresses();
327 while (ee.hasMoreElements()) {
328 InetAddress i = (InetAddress) ee.nextElement();
329 if ((i instanceof Inet4Address) && (i.isSiteLocalAddress()))
330 hostAddress = i.getHostAddress();
338 * TODO: Change to use external routing table implementation
342 private void updateRoutingTable(Message msg) {
343 routingTable.put(msg.getRoute(), msg.getSender());
344 RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> route = msg.getRoute();
346 // Currently only registers rpc implementation.
347 // TODO: do registration for instance based routing
348 QName rpcType = route.getType();
349 RpcRegistration registration = brokerSession.addRpcImplementation(rpcType, facade);
350 _logger.debug("Routing table updated");
357 private Message parseMessage(ZMQ.Socket socket) {
361 byte[] bytes = socket.recv();
362 _logger.debug("Received bytes:[{}]", bytes.length);
363 msg = (Message) Message.deserialize(bytes);
364 } catch (Throwable t) {
370 private class RpcFacade implements RpcImplementation {
373 public Set<QName> getSupportedRpcs() {
374 return Collections.emptySet();
378 public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
380 RouteIdentifierImpl routeId = new RouteIdentifierImpl();
381 routeId.setType(rpc);
383 RpcRequestImpl request = new RpcRequestImpl();
384 request.setRouteIdentifier(routeId);
385 request.setPayload(input);
387 final Future<RpcReply<Object>> ret = sendRpc(request);
389 //TODO: Review result handling
390 RpcResult<CompositeNode> result = new RpcResult<CompositeNode>() {
392 public boolean isSuccessful() {
395 } catch (InterruptedException | ExecutionException e) {
403 public CompositeNode getResult() {
408 public Collection<RpcError> getErrors() {
409 return Collections.EMPTY_LIST;
417 * Listener for rpc registrations
419 private class RpcListener implements RpcRegistrationListener {
422 public void onRpcImplementationAdded(QName name) {
424 _logger.debug("Announcing registration for [{}]", name);
425 RouteIdentifierImpl routeId = new RouteIdentifierImpl();
426 routeId.setType(name);
428 //TODO: Make notice immutable and change message type
429 Message notice = new Message.MessageBuilder()
430 .type(MessageType.ANNOUNCE)
431 .sender("tcp://" + localIp + ":" + rpcPort)
439 public void onRpcImplementationRemoved(QName name) {
440 // TODO: send a rpc-deregistrtation notice
445 public void setBrokerSession(ProviderSession session) {
446 this.brokerSession = session;