Merge "BUG-509: remove unused code"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / test / java / org / opendaylight / controller / sal / connector / remoterpc / utils / MessagingUtil.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  * This program and the accompanying materials are made available under the
4  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5  * and is available at http://www.eclipse.org/legal/epl-v10.html
6  */
7
8 package org.opendaylight.controller.sal.connector.remoterpc.utils;
9
10 import java.io.IOException;
11 import java.io.PrintWriter;
12 import java.io.StringWriter;
13 import java.util.concurrent.ExecutorService;
14 import java.util.concurrent.Executors;
15 import java.util.concurrent.FutureTask;
16 import java.util.concurrent.TimeUnit;
17
18 import junit.framework.Assert;
19
20 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23 import org.zeromq.ZMQ;
24
25 public class MessagingUtil {
26
27   private static final Logger _logger = LoggerFactory.getLogger(MessagingUtil.class);
28
29   public static Runnable startReplyServer(final ZMQ.Context context,
30                                           final String serverAddress,
31                                           final int numRequests /*number of requests after which server shuts down*/) {
32     return new Runnable() {
33
34       @Override
35       public void run() {
36         final ZMQ.Socket socket = context.socket(ZMQ.REP);
37         try {
38           int returnCode = socket.bind("tcp://" + serverAddress);
39           Assert.assertNotSame(-1, returnCode);
40           _logger.info(" Starting reply server[{}] for test...", serverAddress);
41
42           //for (int i=0;i<numRequests;i++) {
43           while (!Thread.currentThread().isInterrupted()) {
44             byte[] bytes = socket.recv();
45             _logger.debug(" Got request ");
46             socket.send(bytes);
47             _logger.debug(" Sent response ");
48           }
49         } catch (Exception x) {
50           StringWriter w = new StringWriter();
51           PrintWriter p = new PrintWriter(w);
52           x.printStackTrace(p);
53           _logger.debug(w.toString());
54         } finally {
55           socket.close();
56           _logger.info("Shutting down reply server");
57         }
58       }
59     };
60   }
61
62   public static Runnable createRouterDealerBridge(final ZMQ.Context context, final String dealerAddress, final int routerPort) {
63     return new Runnable() {
64       @Override
65       public void run() {
66         ZMQ.Socket router = null;
67         ZMQ.Socket dealer = null;
68         try {
69           router = context.socket(ZMQ.ROUTER);
70           dealer = context.socket(ZMQ.DEALER);
71           router.bind("tcp://*:" + routerPort);
72           dealer.bind(dealerAddress);
73           ZMQ.proxy(router, dealer, null);
74         } catch (Exception e) {/*Ignore*/} finally {
75           if (router != null) router.close();
76           if (dealer != null) dealer.close();
77         }
78       }
79     };
80   }
81
82   public static Runnable sendAMessage(final ZMQ.Context context, final String serverAddress, final Message msg)
83       throws IOException, ClassNotFoundException, InterruptedException {
84
85     return new Runnable() {
86       @Override
87       public void run() {
88         final ZMQ.Socket socket = context.socket(ZMQ.REQ);
89         try {
90
91           socket.connect(serverAddress);
92           System.out.println(Thread.currentThread().getName() + " Sending message");
93           try {
94             socket.send(Message.serialize(msg));
95           } catch (IOException e) {
96             e.printStackTrace();
97           }
98           byte[] bytes = socket.recv();
99           Message response = null;
100           try {
101             response = (Message) Message.deserialize(bytes);
102           } catch (IOException e) {
103             e.printStackTrace();
104           } catch (ClassNotFoundException e) {
105             e.printStackTrace();
106           }
107           System.out.println(Thread.currentThread().getName() + " Got response " + response);
108         } catch (Exception x) {
109           x.printStackTrace();
110         } finally {
111           socket.close();
112         }
113       }
114     };
115   }
116
117   public static Runnable sendAnEmptyMessage(final ZMQ.Context context, final String serverAddress)
118           throws IOException, ClassNotFoundException, InterruptedException {
119
120     return new Runnable() {
121       @Override
122       public void run() {
123         final ZMQ.Socket socket = context.socket(ZMQ.REQ);
124         try {
125
126           socket.connect(serverAddress);
127           System.out.println(Thread.currentThread().getName() + " Sending message");
128           try {
129             socket.send(Message.serialize(new Message()));
130           } catch (IOException e) {
131             e.printStackTrace();
132           }
133           byte[] bytes = socket.recv();
134           Message response = null;
135           try {
136             response = (Message) Message.deserialize(bytes);
137           } catch (IOException e) {
138             e.printStackTrace();
139           } catch (ClassNotFoundException e) {
140             e.printStackTrace();
141           }
142           System.out.println(Thread.currentThread().getName() + " Got response " + response);
143         } catch (Exception x) {
144           x.printStackTrace();
145         } finally {
146           socket.close();
147         }
148       }
149     };
150   }
151
152   public static Message createEmptyMessage() {
153     return new Message();
154   }
155
156   /**
157    * Closes ZMQ Context. It tries to gracefully terminate the context. If
158    * termination takes more than a second, its forcefully shutdown.
159    */
160   public static void closeZmqContext(final ZMQ.Context context) {
161     if (context == null) return;
162
163     ExecutorService exec = Executors.newSingleThreadExecutor();
164     FutureTask<?> zmqTermination = new FutureTask<Void>(new Runnable() {
165
166       @Override
167       public void run() {
168         try {
169           if (context != null)
170             context.term();
171             _logger.debug("ZMQ Context terminated gracefully!");
172         } catch (Exception e) {/*Ignore and continue shutdown*/}
173       }
174     }, null);
175
176     exec.execute(zmqTermination);
177
178     try {
179       zmqTermination.get(1L, TimeUnit.SECONDS);
180     } catch (Exception e) {
181       _logger.debug("ZMQ Context terminated forcefully!");
182     }
183
184     exec.shutdownNow();
185   }
186 }