Activate findbugs sonar profile
[controller.git] / opendaylight / netconf / netconf-impl / src / test / java / org / opendaylight / controller / netconf / impl / ConcurrentClientsTest.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.impl;
10
11 import static com.google.common.base.Preconditions.checkNotNull;
12 import static org.junit.Assert.assertEquals;
13 import static org.junit.Assert.fail;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Mockito.doNothing;
16 import static org.mockito.Mockito.mock;
17
18 import com.google.common.base.Preconditions;
19 import com.google.common.collect.Lists;
20 import com.google.common.collect.Sets;
21
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.EventLoopGroup;
24 import io.netty.channel.nio.NioEventLoopGroup;
25 import io.netty.util.HashedWheelTimer;
26 import io.netty.util.concurrent.GlobalEventExecutor;
27
28 import java.io.DataOutputStream;
29 import java.io.InputStream;
30 import java.io.InputStreamReader;
31 import java.lang.management.ManagementFactory;
32 import java.net.InetSocketAddress;
33 import java.net.Socket;
34 import java.util.Arrays;
35 import java.util.Collection;
36 import java.util.Collections;
37 import java.util.List;
38 import java.util.Set;
39 import java.util.concurrent.ExecutionException;
40 import java.util.concurrent.ExecutorService;
41 import java.util.concurrent.Executors;
42 import java.util.concurrent.Future;
43 import java.util.concurrent.ThreadFactory;
44 import java.util.concurrent.atomic.AtomicLong;
45
46 import org.apache.commons.io.IOUtils;
47 import org.junit.After;
48 import org.junit.AfterClass;
49 import org.junit.Before;
50 import org.junit.BeforeClass;
51 import org.junit.Test;
52 import org.junit.runner.RunWith;
53 import org.junit.runners.Parameterized;
54 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
55 import org.opendaylight.controller.netconf.api.NetconfMessage;
56 import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
57 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
58 import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl;
59 import org.opendaylight.controller.netconf.client.SimpleNetconfClientSessionListener;
60 import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
61 import org.opendaylight.controller.netconf.client.conf.NetconfClientConfigurationBuilder;
62 import org.opendaylight.controller.netconf.client.test.TestingNetconfClient;
63 import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
64 import org.opendaylight.controller.netconf.impl.osgi.SessionMonitoringService;
65 import org.opendaylight.controller.netconf.mapping.api.Capability;
66 import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
67 import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
68 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
69 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
70 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
71 import org.opendaylight.controller.netconf.nettyutil.handler.exi.NetconfStartExiMessage;
72 import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
73 import org.opendaylight.controller.netconf.util.messages.NetconfMessageUtil;
74 import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
75 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
76 import org.opendaylight.protocol.framework.NeverReconnectStrategy;
77 import org.slf4j.Logger;
78 import org.slf4j.LoggerFactory;
79 import org.w3c.dom.Document;
80
81 @RunWith(Parameterized.class)
82 public class ConcurrentClientsTest {
83     private static final Logger logger = LoggerFactory.getLogger(ConcurrentClientsTest.class);
84
85     private static ExecutorService clientExecutor;
86
87     private static final int CONCURRENCY = 32;
88     private static final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303);
89
90     private int nettyThreads;
91     private Class<? extends Runnable> clientRunnable;
92     private Set<String> serverCaps;
93
94     public ConcurrentClientsTest(int nettyThreads, Class<? extends Runnable> clientRunnable, Set<String> serverCaps) {
95         this.nettyThreads = nettyThreads;
96         this.clientRunnable = clientRunnable;
97         this.serverCaps = serverCaps;
98     }
99
100     @Parameterized.Parameters()
101     public static Collection<Object[]> data() {
102         return Arrays.asList(new Object[][]{
103                 {4, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES},
104                 {1, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES},
105                 // empty set of capabilities = only base 1.0 netconf capability
106                 {4, TestingNetconfClientRunnable.class, Collections.emptySet()},
107                 {4, TestingNetconfClientRunnable.class, getOnlyExiServerCaps()},
108                 {4, TestingNetconfClientRunnable.class, getOnlyChunkServerCaps()},
109
110                 {4, BlockingClientRunnable.class, getOnlyExiServerCaps()},
111                 {1, BlockingClientRunnable.class, getOnlyExiServerCaps()},
112         });
113     }
114
115     private EventLoopGroup nettyGroup;
116     private NetconfClientDispatcher netconfClientDispatcher;
117
118     private DefaultCommitNotificationProducer commitNot;
119
120     HashedWheelTimer hashedWheelTimer;
121     private TestingNetconfOperation testingNetconfOperation;
122
123     public static SessionMonitoringService createMockedMonitoringService() {
124         SessionMonitoringService monitoring = mock(SessionMonitoringService.class);
125         doNothing().when(monitoring).onSessionUp(any(NetconfServerSession.class));
126         doNothing().when(monitoring).onSessionDown(any(NetconfServerSession.class));
127         return monitoring;
128     }
129
130     @BeforeClass
131     public static void setUpClientExecutor() {
132         clientExecutor = Executors.newFixedThreadPool(CONCURRENCY, new ThreadFactory() {
133             int i = 1;
134
135             @Override
136             public Thread newThread(final Runnable r) {
137                 Thread thread = new Thread(r);
138                 thread.setName("client-" + i++);
139                 thread.setDaemon(true);
140                 return thread;
141             }
142         });
143     }
144
145     @Before
146     public void setUp() throws Exception {
147         hashedWheelTimer = new HashedWheelTimer();
148         nettyGroup = new NioEventLoopGroup(nettyThreads);
149         netconfClientDispatcher = new NetconfClientDispatcherImpl(nettyGroup, nettyGroup, hashedWheelTimer);
150
151         NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
152
153         testingNetconfOperation = new TestingNetconfOperation();
154         factoriesListener.onAddNetconfOperationServiceFactory(new TestingOperationServiceFactory(testingNetconfOperation));
155
156         SessionIdProvider idProvider = new SessionIdProvider();
157
158         NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory(
159                 hashedWheelTimer, factoriesListener, idProvider, 5000, commitNot, createMockedMonitoringService(), serverCaps);
160
161         commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
162
163         NetconfServerDispatcher.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcher.ServerChannelInitializer(serverNegotiatorFactory);
164         final NetconfServerDispatcher dispatch = new NetconfServerDispatcher(serverChannelInitializer, nettyGroup, nettyGroup);
165
166         ChannelFuture s = dispatch.createServer(netconfAddress);
167         s.await();
168     }
169
170     @After
171     public void tearDown(){
172         commitNot.close();
173         hashedWheelTimer.stop();
174         try {
175             nettyGroup.shutdownGracefully().get();
176         } catch (InterruptedException | ExecutionException e) {
177             logger.warn("Ignoring exception while cleaning up after test", e);
178         }
179     }
180
181     @AfterClass
182     public static void tearDownClientExecutor() {
183         clientExecutor.shutdownNow();
184     }
185
186     @Test(timeout = CONCURRENCY * 1000)
187     public void testConcurrentClients() throws Exception {
188
189         List<Future<?>> futures = Lists.newArrayListWithCapacity(CONCURRENCY);
190
191         for (int i = 0; i < CONCURRENCY; i++) {
192             futures.add(clientExecutor.submit(getInstanceOfClientRunnable()));
193         }
194
195         for (Future<?> future : futures) {
196             try {
197                 future.get();
198             } catch (InterruptedException e) {
199                 throw new IllegalStateException(e);
200             } catch (ExecutionException e) {
201                 logger.error("Thread for testing client failed", e);
202                 fail("Client failed: " + e.getMessage());
203             }
204         }
205
206         assertEquals(CONCURRENCY, testingNetconfOperation.getMessageCount());
207     }
208
209     public static Set<String> getOnlyExiServerCaps() {
210         return Sets.newHashSet(
211                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
212                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0
213         );
214     }
215
216     public static Set<String> getOnlyChunkServerCaps() {
217         return Sets.newHashSet(
218                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
219                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1
220         );
221     }
222
223     public Runnable getInstanceOfClientRunnable() throws Exception {
224         return clientRunnable.getConstructor(ConcurrentClientsTest.class).newInstance(this);
225     }
226
227     /**
228      * Responds to all operations except start-exi and counts all requests
229      */
230     private static class TestingNetconfOperation implements NetconfOperation {
231
232         private final AtomicLong counter = new AtomicLong();
233
234         @Override
235         public HandlingPriority canHandle(Document message) {
236             return XmlUtil.toString(message).contains(NetconfStartExiMessage.START_EXI) ?
237                     HandlingPriority.CANNOT_HANDLE :
238                     HandlingPriority.HANDLE_WITH_MAX_PRIORITY;
239         }
240
241         @Override
242         public Document handle(Document requestMessage, NetconfOperationChainedExecution subsequentOperation) throws NetconfDocumentedException {
243             try {
244                 logger.info("Handling netconf message from test {}", XmlUtil.toString(requestMessage));
245                 counter.getAndIncrement();
246                 return XmlUtil.readXmlToDocument("<test/>");
247             } catch (Exception e) {
248                 throw new RuntimeException(e);
249             }
250         }
251
252         public long getMessageCount() {
253             return counter.get();
254         }
255     }
256
257     /**
258      * Hardcoded operation service factory
259      */
260     private static class TestingOperationServiceFactory implements NetconfOperationServiceFactory {
261         private final NetconfOperation[] operations;
262
263         public TestingOperationServiceFactory(final NetconfOperation... operations) {
264             this.operations = operations;
265         }
266
267         @Override
268         public NetconfOperationService createService(String netconfSessionIdForReporting) {
269             return new NetconfOperationService() {
270                 @Override
271                 public Set<Capability> getCapabilities() {
272                     return Collections.emptySet();
273                 }
274
275                 @Override
276                 public Set<NetconfOperation> getNetconfOperations() {
277                     return Sets.newHashSet(operations);
278                 }
279
280                 @Override
281                 public void close() {}
282             };
283         }
284     }
285
286     /**
287      * Pure socket based blocking client
288      */
289     public final class BlockingClientRunnable implements Runnable {
290
291         @Override
292         public void run() {
293             try {
294                 run2();
295             } catch (Exception e) {
296                 throw new IllegalStateException(Thread.currentThread().getName(), e);
297             }
298         }
299
300         private void run2() throws Exception {
301             InputStream clientHello = checkNotNull(XmlFileLoader
302                     .getResourceAsStream("netconfMessages/client_hello.xml"));
303             InputStream getConfig = checkNotNull(XmlFileLoader.getResourceAsStream("netconfMessages/getConfig.xml"));
304
305             Socket clientSocket = new Socket(netconfAddress.getHostString(), netconfAddress.getPort());
306             DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
307             InputStreamReader inFromServer = new InputStreamReader(clientSocket.getInputStream());
308
309             StringBuffer sb = new StringBuffer();
310             while (sb.toString().endsWith("]]>]]>") == false) {
311                 sb.append((char) inFromServer.read());
312             }
313             logger.info(sb.toString());
314
315             outToServer.write(IOUtils.toByteArray(clientHello));
316             outToServer.write("]]>]]>".getBytes());
317             outToServer.flush();
318             // Thread.sleep(100);
319             outToServer.write(IOUtils.toByteArray(getConfig));
320             outToServer.write("]]>]]>".getBytes());
321             outToServer.flush();
322             Thread.sleep(100);
323             sb = new StringBuffer();
324             while (sb.toString().endsWith("]]>]]>") == false) {
325                 sb.append((char) inFromServer.read());
326             }
327             logger.info(sb.toString());
328             clientSocket.close();
329         }
330     }
331
332     /**
333      * TestingNetconfClient based runnable
334      */
335     public final class TestingNetconfClientRunnable implements Runnable {
336
337         @Override
338         public void run() {
339             try {
340                 final TestingNetconfClient netconfClient =
341                         new TestingNetconfClient(Thread.currentThread().getName(), netconfClientDispatcher, getClientConfig());
342                 long sessionId = netconfClient.getSessionId();
343                 logger.info("Client with session id {}: hello exchanged", sessionId);
344
345                 final NetconfMessage getMessage = XmlFileLoader
346                         .xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
347                 NetconfMessage result = netconfClient.sendRequest(getMessage).get();
348                 logger.info("Client with session id {}: got result {}", sessionId, result);
349
350                 Preconditions.checkState(NetconfMessageUtil.isErrorMessage(result) == false,
351                         "Received error response: " + XmlUtil.toString(result.getDocument()) + " to request: "
352                                 + XmlUtil.toString(getMessage.getDocument()));
353
354                 netconfClient.close();
355                 logger.info("Client with session id {}: ended", sessionId);
356             } catch (final Exception e) {
357                 throw new IllegalStateException(Thread.currentThread().getName(), e);
358             }
359         }
360
361         private NetconfClientConfiguration getClientConfig() {
362             final NetconfClientConfigurationBuilder b = NetconfClientConfigurationBuilder.create();
363             b.withAddress(netconfAddress);
364             b.withAdditionalHeader(new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp",
365                     "client"));
366             b.withSessionListener(new SimpleNetconfClientSessionListener());
367             b.withReconnectStrategy(new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE,
368                     NetconfClientConfigurationBuilder.DEFAULT_CONNECTION_TIMEOUT_MILLIS));
369             return b.build();
370         }
371     }
372 }