Merge "Statistics-Manager - Performance Improvement 1) Introduced statistics request...
[controller.git] / opendaylight / netconf / netconf-impl / src / test / java / org / opendaylight / controller / netconf / impl / ConcurrentClientsTest.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.impl;
10
11 import static com.google.common.base.Preconditions.checkNotNull;
12 import static org.junit.Assert.assertEquals;
13 import static org.junit.Assert.fail;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Mockito.doNothing;
16 import static org.mockito.Mockito.mock;
17
18 import com.google.common.base.Preconditions;
19 import com.google.common.collect.Lists;
20 import io.netty.channel.EventLoopGroup;
21 import io.netty.util.HashedWheelTimer;
22 import java.io.DataOutputStream;
23 import java.io.InputStream;
24 import java.io.InputStreamReader;
25 import java.lang.management.ManagementFactory;
26 import java.net.InetSocketAddress;
27 import java.net.Socket;
28 import java.util.Arrays;
29 import java.util.Collection;
30 import java.util.Collections;
31 import java.util.List;
32 import java.util.Set;
33
34 import java.util.concurrent.ExecutionException;
35 import java.util.concurrent.ExecutorService;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.Future;
38 import java.util.concurrent.ThreadFactory;
39 import java.util.concurrent.atomic.AtomicLong;
40 import org.apache.commons.io.IOUtils;
41 import org.junit.After;
42 import org.junit.AfterClass;
43 import org.junit.Before;
44 import org.junit.BeforeClass;
45 import org.junit.Test;
46 import org.junit.runner.RunWith;
47 import org.junit.runners.Parameterized;
48 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
49 import org.opendaylight.controller.netconf.api.NetconfMessage;
50 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
51 import org.opendaylight.controller.netconf.client.test.TestingNetconfClient;
52 import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
53 import org.opendaylight.controller.netconf.impl.osgi.SessionMonitoringService;
54 import org.opendaylight.controller.netconf.mapping.api.Capability;
55 import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
56 import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
57 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
58 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
59 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
60 import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
61 import org.opendaylight.controller.netconf.util.messages.NetconfMessageUtil;
62 import org.opendaylight.controller.netconf.util.messages.NetconfStartExiMessage;
63 import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
64 import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
65 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
66 import org.slf4j.Logger;
67 import org.slf4j.LoggerFactory;
68 import org.w3c.dom.Document;
69
70 import com.google.common.collect.Sets;
71
72 import io.netty.channel.ChannelFuture;
73 import io.netty.channel.nio.NioEventLoopGroup;
74
75 @RunWith(Parameterized.class)
76 public class ConcurrentClientsTest {
77     private static final Logger logger = LoggerFactory.getLogger(ConcurrentClientsTest.class);
78
79     private static ExecutorService clientExecutor;
80
81     private static final int CONCURRENCY = 32;
82     private static final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303);
83
84     private int nettyThreads;
85     private Class<? extends Runnable> clientRunnable;
86     private Set<String> serverCaps;
87
88     public ConcurrentClientsTest(int nettyThreads, Class<? extends Runnable> clientRunnable, Set<String> serverCaps) {
89         this.nettyThreads = nettyThreads;
90         this.clientRunnable = clientRunnable;
91         this.serverCaps = serverCaps;
92     }
93
94     @Parameterized.Parameters()
95     public static Collection<Object[]> data() {
96         return Arrays.asList(new Object[][]{
97                 {4, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES},
98                 {1, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES},
99                 // empty set of capabilities = only base 1.0 netconf capability
100                 {4, TestingNetconfClientRunnable.class, Collections.emptySet()},
101                 {4, TestingNetconfClientRunnable.class, getOnlyExiServerCaps()},
102                 {4, TestingNetconfClientRunnable.class, getOnlyChunkServerCaps()},
103
104                 {4, BlockingClientRunnable.class, getOnlyExiServerCaps()},
105                 {1, BlockingClientRunnable.class, getOnlyExiServerCaps()},
106         });
107     }
108
109     private EventLoopGroup nettyGroup;
110     private NetconfClientDispatcher netconfClientDispatcher;
111
112     private DefaultCommitNotificationProducer commitNot;
113
114     HashedWheelTimer hashedWheelTimer;
115     private TestingNetconfOperation testingNetconfOperation;
116
117     public static SessionMonitoringService createMockedMonitoringService() {
118         SessionMonitoringService monitoring = mock(SessionMonitoringService.class);
119         doNothing().when(monitoring).onSessionUp(any(NetconfServerSession.class));
120         doNothing().when(monitoring).onSessionDown(any(NetconfServerSession.class));
121         return monitoring;
122     }
123
124     @BeforeClass
125     public static void setUpClientExecutor() {
126         clientExecutor = Executors.newFixedThreadPool(CONCURRENCY, new ThreadFactory() {
127             int i = 1;
128
129             @Override
130             public Thread newThread(final Runnable r) {
131                 Thread thread = new Thread(r);
132                 thread.setName("client-" + i++);
133                 thread.setDaemon(true);
134                 return thread;
135             }
136         });
137     }
138
139     @Before
140     public void setUp() throws Exception {
141
142         nettyGroup = new NioEventLoopGroup(nettyThreads);
143         NetconfHelloMessageAdditionalHeader additionalHeader = new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp", "client");
144         netconfClientDispatcher = new NetconfClientDispatcher( nettyGroup, nettyGroup, additionalHeader, 5000);
145
146         NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
147
148         testingNetconfOperation = new TestingNetconfOperation();
149         factoriesListener.onAddNetconfOperationServiceFactory(new TestingOperationServiceFactory(testingNetconfOperation));
150
151         SessionIdProvider idProvider = new SessionIdProvider();
152         hashedWheelTimer = new HashedWheelTimer();
153
154         NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory(
155                 hashedWheelTimer, factoriesListener, idProvider, 5000, commitNot, createMockedMonitoringService(), serverCaps);
156
157         commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
158
159         NetconfServerDispatcher.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcher.ServerChannelInitializer(serverNegotiatorFactory);
160         final NetconfServerDispatcher dispatch = new NetconfServerDispatcher(serverChannelInitializer, nettyGroup, nettyGroup);
161
162         ChannelFuture s = dispatch.createServer(netconfAddress);
163         s.await();
164     }
165
166     @After
167     public void tearDown(){
168         commitNot.close();
169         hashedWheelTimer.stop();
170         try {
171             nettyGroup.shutdownGracefully().get();
172         } catch (InterruptedException | ExecutionException e) {
173             logger.warn("Ignoring exception while cleaning up after test", e);
174         }
175     }
176
177     @AfterClass
178     public static void tearDownClientExecutor() {
179         clientExecutor.shutdownNow();
180     }
181
182     @Test(timeout = CONCURRENCY * 1000)
183     public void testConcurrentClients() throws Exception {
184
185         List<Future<?>> futures = Lists.newArrayListWithCapacity(CONCURRENCY);
186
187         for (int i = 0; i < CONCURRENCY; i++) {
188             futures.add(clientExecutor.submit(getInstanceOfClientRunnable()));
189         }
190
191         for (Future<?> future : futures) {
192             try {
193                 future.get();
194             } catch (InterruptedException e) {
195                 throw new IllegalStateException(e);
196             } catch (ExecutionException e) {
197                 logger.error("Thread for testing client failed", e);
198                 fail("Client failed: " + e.getMessage());
199             }
200         }
201
202         assertEquals(CONCURRENCY, testingNetconfOperation.getMessageCount());
203     }
204
205     public static Set<String> getOnlyExiServerCaps() {
206         return Sets.newHashSet(
207                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
208                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0
209         );
210     }
211
212     public static Set<String> getOnlyChunkServerCaps() {
213         return Sets.newHashSet(
214                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
215                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1
216         );
217     }
218
219     public Runnable getInstanceOfClientRunnable() throws Exception {
220         return clientRunnable.getConstructor(ConcurrentClientsTest.class).newInstance(this);
221     }
222
223     /**
224      * Responds to all operations except start-exi and counts all requests
225      */
226     private static class TestingNetconfOperation implements NetconfOperation {
227
228         private final AtomicLong counter = new AtomicLong();
229
230         @Override
231         public HandlingPriority canHandle(Document message) {
232             return XmlUtil.toString(message).contains(NetconfStartExiMessage.START_EXI) ?
233                     HandlingPriority.CANNOT_HANDLE :
234                     HandlingPriority.HANDLE_WITH_MAX_PRIORITY;
235         }
236
237         @Override
238         public Document handle(Document requestMessage, NetconfOperationChainedExecution subsequentOperation) throws NetconfDocumentedException {
239             try {
240                 logger.info("Handling netconf message from test {}", XmlUtil.toString(requestMessage));
241                 counter.getAndIncrement();
242                 return XmlUtil.readXmlToDocument("<test/>");
243             } catch (Exception e) {
244                 throw new RuntimeException(e);
245             }
246         }
247
248         public long getMessageCount() {
249             return counter.get();
250         }
251     }
252
253     /**
254      * Hardcoded operation service factory
255      */
256     private static class TestingOperationServiceFactory implements NetconfOperationServiceFactory {
257         private final NetconfOperation[] operations;
258
259         public TestingOperationServiceFactory(final NetconfOperation... operations) {
260             this.operations = operations;
261         }
262
263         @Override
264         public NetconfOperationService createService(String netconfSessionIdForReporting) {
265             return new NetconfOperationService() {
266                 @Override
267                 public Set<Capability> getCapabilities() {
268                     return Collections.emptySet();
269                 }
270
271                 @Override
272                 public Set<NetconfOperation> getNetconfOperations() {
273                     return Sets.newHashSet(operations);
274                 }
275
276                 @Override
277                 public void close() {}
278             };
279         }
280     }
281
282     /**
283      * Pure socket based blocking client
284      */
285     public final class BlockingClientRunnable implements Runnable {
286
287         @Override
288         public void run() {
289             try {
290                 run2();
291             } catch (Exception e) {
292                 throw new IllegalStateException(Thread.currentThread().getName(), e);
293             }
294         }
295
296         private void run2() throws Exception {
297             InputStream clientHello = checkNotNull(XmlFileLoader
298                     .getResourceAsStream("netconfMessages/client_hello.xml"));
299             InputStream getConfig = checkNotNull(XmlFileLoader.getResourceAsStream("netconfMessages/getConfig.xml"));
300
301             Socket clientSocket = new Socket(netconfAddress.getHostString(), netconfAddress.getPort());
302             DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
303             InputStreamReader inFromServer = new InputStreamReader(clientSocket.getInputStream());
304
305             StringBuffer sb = new StringBuffer();
306             while (sb.toString().endsWith("]]>]]>") == false) {
307                 sb.append((char) inFromServer.read());
308             }
309             logger.info(sb.toString());
310
311             outToServer.write(IOUtils.toByteArray(clientHello));
312             outToServer.write("]]>]]>".getBytes());
313             outToServer.flush();
314             // Thread.sleep(100);
315             outToServer.write(IOUtils.toByteArray(getConfig));
316             outToServer.write("]]>]]>".getBytes());
317             outToServer.flush();
318             Thread.sleep(100);
319             sb = new StringBuffer();
320             while (sb.toString().endsWith("]]>]]>") == false) {
321                 sb.append((char) inFromServer.read());
322             }
323             logger.info(sb.toString());
324             clientSocket.close();
325         }
326     }
327
328     /**
329      * TestingNetconfClient based runnable
330      */
331     public final class TestingNetconfClientRunnable implements Runnable {
332
333         @Override
334         public void run() {
335             try {
336                 final TestingNetconfClient netconfClient = new TestingNetconfClient(Thread.currentThread().getName(),
337                         netconfAddress, netconfClientDispatcher);
338                 long sessionId = netconfClient.getSessionId();
339                 logger.info("Client with session id {}: hello exchanged", sessionId);
340
341                 final NetconfMessage getMessage = XmlFileLoader
342                         .xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
343                 NetconfMessage result = netconfClient.sendRequest(getMessage).get();
344                 logger.info("Client with session id {}: got result {}", sessionId, result);
345
346                 Preconditions.checkState(NetconfMessageUtil.isErrorMessage(result) == false,
347                         "Received error response: " + XmlUtil.toString(result.getDocument()) + " to request: "
348                                 + XmlUtil.toString(getMessage.getDocument()));
349
350                 netconfClient.close();
351                 logger.info("Client with session id {}: ended", sessionId);
352             } catch (final Exception e) {
353                 throw new IllegalStateException(Thread.currentThread().getName(), e);
354             }
355         }
356     }
357 }