2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.connector.remoterpc.utils;
10 import java.io.IOException;
11 import java.io.PrintWriter;
12 import java.io.StringWriter;
13 import java.util.concurrent.ExecutorService;
14 import java.util.concurrent.Executors;
15 import java.util.concurrent.FutureTask;
16 import java.util.concurrent.TimeUnit;
18 import junit.framework.Assert;
20 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23 import org.zeromq.ZMQ;
25 public class MessagingUtil {
27 private static final Logger _logger = LoggerFactory.getLogger(MessagingUtil.class);
29 public static Runnable startReplyServer(final ZMQ.Context context,
30 final String serverAddress,
31 final int numRequests /*number of requests after which server shuts down*/) {
32 return new Runnable() {
36 final ZMQ.Socket socket = context.socket(ZMQ.REP);
38 int returnCode = socket.bind("tcp://" + serverAddress);
39 Assert.assertNotSame(-1, returnCode);
40 _logger.info(" Starting reply server[{}] for test...", serverAddress);
42 //for (int i=0;i<numRequests;i++) {
43 while (!Thread.currentThread().isInterrupted()) {
44 byte[] bytes = socket.recv();
45 _logger.debug(" Got request ");
47 _logger.debug(" Sent response ");
49 } catch (Exception x) {
50 StringWriter w = new StringWriter();
51 PrintWriter p = new PrintWriter(w);
53 _logger.debug(w.toString());
56 _logger.info("Shutting down reply server");
62 public static Runnable createRouterDealerBridge(final ZMQ.Context context, final String dealerAddress, final int routerPort) {
63 return new Runnable() {
66 ZMQ.Socket router = null;
67 ZMQ.Socket dealer = null;
69 router = context.socket(ZMQ.ROUTER);
70 dealer = context.socket(ZMQ.DEALER);
71 router.bind("tcp://*:" + routerPort);
72 dealer.bind(dealerAddress);
73 ZMQ.proxy(router, dealer, null);
74 } catch (Exception e) {/*Ignore*/} finally {
75 if (router != null) router.close();
76 if (dealer != null) dealer.close();
82 public static Runnable sendAMessage(final ZMQ.Context context, final String serverAddress, final Message msg)
83 throws IOException, ClassNotFoundException, InterruptedException {
85 return new Runnable() {
88 final ZMQ.Socket socket = context.socket(ZMQ.REQ);
91 socket.connect(serverAddress);
92 System.out.println(Thread.currentThread().getName() + " Sending message");
94 socket.send(Message.serialize(msg));
95 } catch (IOException e) {
98 byte[] bytes = socket.recv();
99 Message response = null;
101 response = (Message) Message.deserialize(bytes);
102 } catch (IOException e) {
104 } catch (ClassNotFoundException e) {
107 System.out.println(Thread.currentThread().getName() + " Got response " + response);
108 } catch (Exception x) {
117 public static Runnable sendAnEmptyMessage(final ZMQ.Context context, final String serverAddress)
118 throws IOException, ClassNotFoundException, InterruptedException {
120 return new Runnable() {
123 final ZMQ.Socket socket = context.socket(ZMQ.REQ);
126 socket.connect(serverAddress);
127 System.out.println(Thread.currentThread().getName() + " Sending message");
129 socket.send(Message.serialize(new Message()));
130 } catch (IOException e) {
133 byte[] bytes = socket.recv();
134 Message response = null;
136 response = (Message) Message.deserialize(bytes);
137 } catch (IOException e) {
139 } catch (ClassNotFoundException e) {
142 System.out.println(Thread.currentThread().getName() + " Got response " + response);
143 } catch (Exception x) {
152 public static Message createEmptyMessage() {
153 return new Message();
157 * Closes ZMQ Context. It tries to gracefully terminate the context. If
158 * termination takes more than a second, its forcefully shutdown.
160 public static void closeZmqContext(final ZMQ.Context context) {
161 if (context == null) return;
163 ExecutorService exec = Executors.newSingleThreadExecutor();
164 FutureTask<?> zmqTermination = new FutureTask<Void>(new Runnable() {
171 _logger.debug("ZMQ Context terminated gracefully!");
172 } catch (Exception e) {/*Ignore and continue shutdown*/}
176 exec.execute(zmqTermination);
179 zmqTermination.get(1L, TimeUnit.SECONDS);
180 } catch (Exception e) {
181 _logger.debug("ZMQ Context terminated forcefully!");