2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.connector.remoterpc.utils;
10 import junit.framework.Assert;
11 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
12 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory;
14 import org.zeromq.ZMQ;
16 import java.io.IOException;
17 import java.io.PrintWriter;
18 import java.io.StringWriter;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.FutureTask;
22 import java.util.concurrent.TimeUnit;
24 public class MessagingUtil {
26 private static final Logger _logger = LoggerFactory.getLogger(MessagingUtil.class);
28 public static Runnable startReplyServer(final ZMQ.Context context,
29 final String serverAddress,
30 final int numRequests /*number of requests after which server shuts down*/) {
31 return new Runnable() {
35 final ZMQ.Socket socket = context.socket(ZMQ.REP);
37 int returnCode = socket.bind("tcp://" + serverAddress);
38 Assert.assertNotSame(-1, returnCode);
39 _logger.info(" Starting reply server[{}] for test...", serverAddress);
41 //for (int i=0;i<numRequests;i++) {
42 while (!Thread.currentThread().isInterrupted()) {
43 byte[] bytes = socket.recv();
44 _logger.debug(" Got request ");
46 _logger.debug(" Sent response ");
48 } catch (Exception x) {
49 StringWriter w = new StringWriter();
50 PrintWriter p = new PrintWriter(w);
52 _logger.debug(w.toString());
55 _logger.info("Shutting down reply server");
61 public static Runnable createRouterDealerBridge(final ZMQ.Context context, final String dealerAddress, final int routerPort) {
62 return new Runnable() {
65 ZMQ.Socket router = null;
66 ZMQ.Socket dealer = null;
68 router = context.socket(ZMQ.ROUTER);
69 dealer = context.socket(ZMQ.DEALER);
70 router.bind("tcp://*:" + routerPort);
71 dealer.bind(dealerAddress);
72 ZMQ.proxy(router, dealer, null);
73 } catch (Exception e) {/*Ignore*/} finally {
74 if (router != null) router.close();
75 if (dealer != null) dealer.close();
81 public static Runnable sendAnEmptyMessage(final ZMQ.Context context, final String serverAddress)
82 throws IOException, ClassNotFoundException, InterruptedException {
84 return new Runnable() {
87 final ZMQ.Socket socket = context.socket(ZMQ.REQ);
90 socket.connect(serverAddress);
91 System.out.println(Thread.currentThread().getName() + " Sending message");
93 socket.send(Message.serialize(new Message()));
94 } catch (IOException e) {
97 byte[] bytes = socket.recv();
98 Message response = null;
100 response = (Message) Message.deserialize(bytes);
101 } catch (IOException e) {
103 } catch (ClassNotFoundException e) {
106 System.out.println(Thread.currentThread().getName() + " Got response " + response);
107 } catch (Exception x) {
116 public static Message createEmptyMessage() {
117 return new Message();
121 * Closes ZMQ Context. It tries to gracefully terminate the context. If
122 * termination takes more than a second, its forcefully shutdown.
124 public static void closeZmqContext(final ZMQ.Context context) {
125 if (context == null) return;
127 ExecutorService exec = Executors.newSingleThreadExecutor();
128 FutureTask zmqTermination = new FutureTask(new Runnable() {
135 _logger.debug("ZMQ Context terminated gracefully!");
136 } catch (Exception e) {/*Ignore and continue shutdown*/}
140 exec.execute(zmqTermination);
143 zmqTermination.get(1L, TimeUnit.SECONDS);
144 } catch (Exception e) {
145 _logger.debug("ZMQ Context terminated forcefully!");