Merge "Changed codec for Identityref in JSON transformation"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / test / java / org / opendaylight / controller / sal / connector / remoterpc / utils / MessagingUtil.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  * This program and the accompanying materials are made available under the
4  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5  * and is available at http://www.eclipse.org/legal/epl-v10.html
6  */
7
8 package org.opendaylight.controller.sal.connector.remoterpc.utils;
9
10 import junit.framework.Assert;
11 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
12 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory;
14 import org.zeromq.ZMQ;
15
16 import java.io.IOException;
17 import java.io.PrintWriter;
18 import java.io.StringWriter;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.FutureTask;
22 import java.util.concurrent.TimeUnit;
23
24 public class MessagingUtil {
25
26   private static final Logger _logger = LoggerFactory.getLogger(MessagingUtil.class);
27
28   public static Runnable startReplyServer(final ZMQ.Context context,
29                                           final String serverAddress,
30                                           final int numRequests /*number of requests after which server shuts down*/) {
31     return new Runnable() {
32
33       @Override
34       public void run() {
35         final ZMQ.Socket socket = context.socket(ZMQ.REP);
36         try {
37           int returnCode = socket.bind("tcp://" + serverAddress);
38           Assert.assertNotSame(-1, returnCode);
39           _logger.info(" Starting reply server[{}] for test...", serverAddress);
40
41           //for (int i=0;i<numRequests;i++) {
42           while (!Thread.currentThread().isInterrupted()) {
43             byte[] bytes = socket.recv();
44             _logger.debug(" Got request ");
45             socket.send(bytes);
46             _logger.debug(" Sent response ");
47           }
48         } catch (Exception x) {
49           StringWriter w = new StringWriter();
50           PrintWriter p = new PrintWriter(w);
51           x.printStackTrace(p);
52           _logger.debug(w.toString());
53         } finally {
54           socket.close();
55           _logger.info("Shutting down reply server");
56         }
57       }
58     };
59   }
60
61   public static Runnable createRouterDealerBridge(final ZMQ.Context context, final String dealerAddress, final int routerPort) {
62     return new Runnable() {
63       @Override
64       public void run() {
65         ZMQ.Socket router = null;
66         ZMQ.Socket dealer = null;
67         try {
68           router = context.socket(ZMQ.ROUTER);
69           dealer = context.socket(ZMQ.DEALER);
70           router.bind("tcp://*:" + routerPort);
71           dealer.bind(dealerAddress);
72           ZMQ.proxy(router, dealer, null);
73         } catch (Exception e) {/*Ignore*/} finally {
74           if (router != null) router.close();
75           if (dealer != null) dealer.close();
76         }
77       }
78     };
79   }
80
81   public static Runnable sendAnEmptyMessage(final ZMQ.Context context, final String serverAddress)
82           throws IOException, ClassNotFoundException, InterruptedException {
83
84     return new Runnable() {
85       @Override
86       public void run() {
87         final ZMQ.Socket socket = context.socket(ZMQ.REQ);
88         try {
89
90           socket.connect(serverAddress);
91           System.out.println(Thread.currentThread().getName() + " Sending message");
92           try {
93             socket.send(Message.serialize(new Message()));
94           } catch (IOException e) {
95             e.printStackTrace();
96           }
97           byte[] bytes = socket.recv();
98           Message response = null;
99           try {
100             response = (Message) Message.deserialize(bytes);
101           } catch (IOException e) {
102             e.printStackTrace();
103           } catch (ClassNotFoundException e) {
104             e.printStackTrace();
105           }
106           System.out.println(Thread.currentThread().getName() + " Got response " + response);
107         } catch (Exception x) {
108           x.printStackTrace();
109         } finally {
110           socket.close();
111         }
112       }
113     };
114   }
115
116   public static Message createEmptyMessage() {
117     return new Message();
118   }
119
120   /**
121    * Closes ZMQ Context. It tries to gracefully terminate the context. If
122    * termination takes more than a second, its forcefully shutdown.
123    */
124   public static void closeZmqContext(final ZMQ.Context context) {
125     if (context == null) return;
126
127     ExecutorService exec = Executors.newSingleThreadExecutor();
128     FutureTask zmqTermination = new FutureTask(new Runnable() {
129
130       @Override
131       public void run() {
132         try {
133           if (context != null)
134             context.term();
135             _logger.debug("ZMQ Context terminated gracefully!");
136         } catch (Exception e) {/*Ignore and continue shutdown*/}
137       }
138     }, null);
139
140     exec.execute(zmqTermination);
141
142     try {
143       zmqTermination.get(1L, TimeUnit.SECONDS);
144     } catch (Exception e) {
145       _logger.debug("ZMQ Context terminated forcefully!");
146     }
147
148     exec.shutdownNow();
149   }
150 }