e1345945a73dabf0951883bf62cbcde1d0518f87
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / test / java / org / opendaylight / controller / sal / connector / remoterpc / ClientRequestHandlerTest.java
1 package org.opendaylight.controller.sal.connector.remoterpc;
2
3 import junit.framework.Assert;
4 import org.junit.After;
5 import org.junit.Before;
6 import org.junit.Test;
7 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
8 import org.opendaylight.controller.sal.connector.remoterpc.utils.MessagingUtil;
9 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory;
11 import org.zeromq.ZMQ;
12
13 import java.io.IOException;
14 import java.util.concurrent.*;
15
16 /**
17  *
18  */
19 public class ClientRequestHandlerTest {
20
21   private final Logger _logger = LoggerFactory.getLogger(ClientRequestHandlerTest.class);
22
23   ZMQ.Context context;
24   ExecutorService serverThread;
25   final String SERVER_ADDRESS = "localhost:5554";
26
27   ClientRequestHandler handler;
28
29   @Before
30   public void setUp() throws Exception {
31     context = ZMQ.context(1);
32     serverThread = Executors.newCachedThreadPool();
33     handler = new ClientRequestHandler(context);
34   }
35
36   @After
37   public void tearDown() throws Exception {
38     serverThread.shutdown();
39     MessagingUtil.closeZmqContext(context);
40     handler.close();
41   }
42
43  @Test
44   public void handle_SingleRemote_ShouldReturnResponse() throws Exception {
45     serverThread.execute(MessagingUtil.startReplyServer(context, SERVER_ADDRESS, 1));
46     Message request = new Message();
47     request.setRecipient(SERVER_ADDRESS);
48     Message response = handleMessageWithTimeout(request);
49     Assert.assertNotNull(response);
50     //should be connected to only 1 remote server
51     Assert.assertEquals(1, handler.getWorkerCount());
52     Assert.assertEquals(response.getRecipient(), SERVER_ADDRESS);
53   }
54
55  // @Test
56   public void handle_MultiRemote_ShouldReturnResponses() throws Exception {
57     ExecutorService threadPool = Executors.newCachedThreadPool();
58     final int port = 5555;
59     String serverAddress = null;
60     for (int i = 0; i < 5; i++) {
61       serverAddress = "localhost:" + (port + i);
62       serverThread.execute(MessagingUtil.startReplyServer(context, serverAddress, 1));
63       threadPool.execute(createEmptyMessageTaskAndHandle(handler, serverAddress));
64     }
65     Thread.currentThread().sleep(5000);//wait for all messages to get processed
66     //should be connected to 5 remote server
67     Assert.assertEquals(5, handler.getWorkerCount());
68   }
69
70   private Runnable createEmptyMessageTaskAndHandle(final ClientRequestHandler handler, final String serverAddress) {
71
72     return new Runnable() {
73       @Override
74       public void run() {
75         Message request = new Message();
76         request.setRecipient(serverAddress);
77         try {
78           Message response = handleMessageWithTimeout(request);
79           Assert.assertNotNull(response);
80           Assert.assertEquals(response.getRecipient(), serverAddress);
81         } catch (Exception e) {
82           throw new RuntimeException(e);
83         }
84       }
85     };
86   }
87
88   private Message handleMessageWithTimeout(final Message request) {
89     Message response = null;
90
91     FutureTask task = new FutureTask(new Callable<Message>() {
92
93       @Override
94       public Message call() {
95         try {
96           return handler.handle(request);
97         } catch (Exception e) {
98           _logger.debug("Client handler failed to handle request. Exception is [{}]", e);
99         }
100         return null;
101       }
102     });
103
104     serverThread.execute(task);
105
106     try {
107       response = (Message) task.get(5L, TimeUnit.SECONDS); //wait for max 5 sec for server to respond
108     } catch (Exception e) {/*ignore and continue*/}
109
110     return response;
111   }
112
113 }