1 package org.opendaylight.controller.sal.connector.remoterpc;
3 import junit.framework.Assert;
4 import org.junit.After;
5 import org.junit.Before;
7 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
8 import org.opendaylight.controller.sal.connector.remoterpc.utils.MessagingUtil;
9 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory;
11 import org.zeromq.ZMQ;
13 import java.io.IOException;
14 import java.util.concurrent.*;
19 public class ClientRequestHandlerTest {
21 private final Logger _logger = LoggerFactory.getLogger(ClientRequestHandlerTest.class);
24 ExecutorService serverThread;
25 final String SERVER_ADDRESS = "localhost:5553";
27 ClientRequestHandler handler;
30 public void setUp() throws Exception {
31 context = ZMQ.context(1);
32 serverThread = Executors.newCachedThreadPool();
33 handler = new ClientRequestHandler(context);
37 public void tearDown() throws Exception {
38 serverThread.shutdown();
39 MessagingUtil.closeZmqContext(context);
44 public void handle_SingleRemote_ShouldReturnResponse() throws Exception {
45 serverThread.execute(MessagingUtil.startReplyServer(context, SERVER_ADDRESS, 1));
46 Message request = new Message();
47 request.setRecipient(SERVER_ADDRESS);
48 Message response = handleMessageWithTimeout(request);
49 Assert.assertNotNull(response);
50 //should be connected to only 1 remote server
51 Assert.assertEquals(1, handler.getWorkerCount());
52 Assert.assertEquals(response.getRecipient(), SERVER_ADDRESS);
56 public void handle_MultiRemote_ShouldReturnResponses() throws Exception {
57 ExecutorService threadPool = Executors.newCachedThreadPool();
58 final int port = 5555;
59 String serverAddress = null;
60 for (int i = 0; i < 5; i++) {
61 serverAddress = "localhost:" + (port + i);
62 serverThread.execute(MessagingUtil.startReplyServer(context, serverAddress, 1));
63 threadPool.execute(createEmptyMessageTaskAndHandle(handler, serverAddress));
65 Thread.currentThread().sleep(5000);//wait for all messages to get processed
66 //should be connected to 5 remote server
67 Assert.assertEquals(5, handler.getWorkerCount());
70 private Runnable createEmptyMessageTaskAndHandle(final ClientRequestHandler handler, final String serverAddress) {
72 return new Runnable() {
75 Message request = new Message();
76 request.setRecipient(serverAddress);
78 Message response = handleMessageWithTimeout(request);
79 Assert.assertNotNull(response);
80 Assert.assertEquals(response.getRecipient(), serverAddress);
81 } catch (Exception e) {
82 throw new RuntimeException(e);
88 private Message handleMessageWithTimeout(final Message request) {
89 Message response = null;
91 FutureTask task = new FutureTask(new Callable<Message>() {
94 public Message call() {
96 return handler.handle(request);
97 } catch (Exception e) {
98 _logger.debug("Client handler failed to handle request. Exception is [{}]", e);
104 serverThread.execute(task);
107 response = (Message) task.get(5L, TimeUnit.SECONDS); //wait for max 5 sec for server to respond
108 } catch (Exception e) {/*ignore and continue*/}