Implement finding a primary based on the shard name and do basic wiring of Distribute...
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / test / java / org / opendaylight / controller / sal / connector / remoterpc / ClientRequestHandlerTest.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.connector.remoterpc;
9
10 import java.util.concurrent.Callable;
11 import java.util.concurrent.ExecutorService;
12 import java.util.concurrent.Executors;
13 import java.util.concurrent.FutureTask;
14 import java.util.concurrent.TimeUnit;
15
16 import junit.framework.Assert;
17
18 import org.junit.After;
19 import org.junit.Before;
20 import org.junit.Test;
21 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
22 import org.opendaylight.controller.sal.connector.remoterpc.utils.MessagingUtil;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25 import org.zeromq.ZMQ;
26
27 /**
28  *
29  */
30 public class ClientRequestHandlerTest {
31
32   private final Logger _logger = LoggerFactory.getLogger(ClientRequestHandlerTest.class);
33
34   ZMQ.Context context;
35   ExecutorService serverThread;
36   final String SERVER_ADDRESS = "localhost:5553";
37
38   ClientRequestHandler handler;
39
40   @Before
41   public void setUp() throws Exception {
42     context = ZMQ.context(1);
43     serverThread = Executors.newCachedThreadPool();
44     handler = new ClientRequestHandler(context);
45   }
46
47   @After
48   public void tearDown() throws Exception {
49     serverThread.shutdown();
50     MessagingUtil.closeZmqContext(context);
51     handler.close();
52   }
53
54  @Test
55   public void handle_SingleRemote_ShouldReturnResponse() throws Exception {
56     serverThread.execute(MessagingUtil.startReplyServer(context, SERVER_ADDRESS, 1));
57     Message request = new Message();
58     request.setRecipient(SERVER_ADDRESS);
59     Message response = handleMessageWithTimeout(request);
60     Assert.assertNotNull(response);
61     //should be connected to only 1 remote server
62     Assert.assertEquals(1, handler.getWorkerCount());
63     Assert.assertEquals(response.getRecipient(), SERVER_ADDRESS);
64   }
65
66  // @Test
67   public void handle_MultiRemote_ShouldReturnResponses() throws Exception {
68     ExecutorService threadPool = Executors.newCachedThreadPool();
69     final int port = 5555;
70     String serverAddress = null;
71     for (int i = 0; i < 5; i++) {
72       serverAddress = "localhost:" + (port + i);
73       serverThread.execute(MessagingUtil.startReplyServer(context, serverAddress, 1));
74       threadPool.execute(createEmptyMessageTaskAndHandle(handler, serverAddress));
75     }
76     Thread.sleep(5000);//wait for all messages to get processed
77     //should be connected to 5 remote server
78     Assert.assertEquals(5, handler.getWorkerCount());
79   }
80
81   private Runnable createEmptyMessageTaskAndHandle(final ClientRequestHandler handler, final String serverAddress) {
82
83     return new Runnable() {
84       @Override
85       public void run() {
86         Message request = new Message();
87         request.setRecipient(serverAddress);
88         try {
89           Message response = handleMessageWithTimeout(request);
90           Assert.assertNotNull(response);
91           Assert.assertEquals(response.getRecipient(), serverAddress);
92         } catch (Exception e) {
93           throw new RuntimeException(e);
94         }
95       }
96     };
97   }
98
99   private Message handleMessageWithTimeout(final Message request) {
100     Message response = null;
101
102     FutureTask<?> task = new FutureTask<Message>(new Callable<Message>() {
103
104       @Override
105       public Message call() {
106         try {
107           return handler.handle(request);
108         } catch (Exception e) {
109           _logger.debug("Client handler failed to handle request. Exception is [{}]", e);
110         }
111         return null;
112       }
113     });
114
115     serverThread.execute(task);
116
117     try {
118       response = (Message) task.get(5L, TimeUnit.SECONDS); //wait for max 5 sec for server to respond
119     } catch (Exception e) {/*ignore and continue*/}
120
121     return response;
122   }
123
124 }