fix ServiceHandler SpotBugs false positives
[transportpce.git] / tests / honeynode / 1.2.1 / netconf-impl / src / test / java / org / opendaylight / netconf / impl / ConcurrentClientsTest.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.impl;
10
11 import static com.google.common.base.Preconditions.checkNotNull;
12 import static org.junit.Assert.assertEquals;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Mockito.doNothing;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.mock;
17
18 import com.google.common.base.Preconditions;
19 import com.google.common.collect.Lists;
20 import com.google.common.collect.Sets;
21 import com.google.common.io.ByteStreams;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.EventLoopGroup;
24 import io.netty.channel.nio.NioEventLoopGroup;
25 import io.netty.util.HashedWheelTimer;
26 import io.netty.util.concurrent.GlobalEventExecutor;
27 import java.io.DataOutputStream;
28 import java.io.InputStream;
29 import java.io.InputStreamReader;
30 import java.net.InetSocketAddress;
31 import java.net.Socket;
32 import java.util.Arrays;
33 import java.util.Collection;
34 import java.util.Collections;
35 import java.util.List;
36 import java.util.Set;
37 import java.util.concurrent.ExecutionException;
38 import java.util.concurrent.ExecutorService;
39 import java.util.concurrent.Executors;
40 import java.util.concurrent.Future;
41 import java.util.concurrent.ThreadFactory;
42 import java.util.concurrent.atomic.AtomicLong;
43 import org.junit.After;
44 import org.junit.AfterClass;
45 import org.junit.Before;
46 import org.junit.BeforeClass;
47 import org.junit.Test;
48 import org.junit.runner.RunWith;
49 import org.junit.runners.Parameterized;
50 import org.opendaylight.netconf.api.DocumentedException;
51 import org.opendaylight.netconf.api.NetconfMessage;
52 import org.opendaylight.netconf.api.capability.Capability;
53 import org.opendaylight.netconf.api.messages.NetconfHelloMessageAdditionalHeader;
54 import org.opendaylight.netconf.api.monitoring.CapabilityListener;
55 import org.opendaylight.netconf.api.monitoring.NetconfMonitoringService;
56 import org.opendaylight.netconf.api.monitoring.SessionEvent;
57 import org.opendaylight.netconf.api.monitoring.SessionListener;
58 import org.opendaylight.netconf.api.xml.XmlNetconfConstants;
59 import org.opendaylight.netconf.api.xml.XmlUtil;
60 import org.opendaylight.netconf.client.NetconfClientDispatcher;
61 import org.opendaylight.netconf.client.NetconfClientDispatcherImpl;
62 import org.opendaylight.netconf.client.SimpleNetconfClientSessionListener;
63 import org.opendaylight.netconf.client.TestingNetconfClient;
64 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
65 import org.opendaylight.netconf.client.conf.NetconfClientConfigurationBuilder;
66 import org.opendaylight.netconf.impl.osgi.AggregatedNetconfOperationServiceFactory;
67 import org.opendaylight.netconf.mapping.api.HandlingPriority;
68 import org.opendaylight.netconf.mapping.api.NetconfOperation;
69 import org.opendaylight.netconf.mapping.api.NetconfOperationChainedExecution;
70 import org.opendaylight.netconf.mapping.api.NetconfOperationService;
71 import org.opendaylight.netconf.mapping.api.NetconfOperationServiceFactory;
72 import org.opendaylight.netconf.nettyutil.handler.exi.NetconfStartExiMessage;
73 import org.opendaylight.netconf.util.messages.NetconfMessageUtil;
74 import org.opendaylight.netconf.util.test.XmlFileLoader;
75 import org.opendaylight.protocol.framework.NeverReconnectStrategy;
76 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.CapabilitiesBuilder;
77 import org.slf4j.Logger;
78 import org.slf4j.LoggerFactory;
79 import org.w3c.dom.Document;
80
81 @RunWith(Parameterized.class)
82 public class ConcurrentClientsTest {
83     private static final Logger LOG = LoggerFactory.getLogger(ConcurrentClientsTest.class);
84
85     private static ExecutorService clientExecutor;
86
87     private static final int CONCURRENCY = 32;
88     private static final InetSocketAddress NETCONF_ADDRESS = new InetSocketAddress("127.0.0.1", 8303);
89
90     private final int nettyThreads;
91     private final Class<? extends Runnable> clientRunnable;
92     private final Set<String> serverCaps;
93
94     public ConcurrentClientsTest(int nettyThreads, Class<? extends Runnable> clientRunnable, Set<String> serverCaps) {
95         this.nettyThreads = nettyThreads;
96         this.clientRunnable = clientRunnable;
97         this.serverCaps = serverCaps;
98     }
99
100     @Parameterized.Parameters()
101     public static Collection<Object[]> data() {
102         return Arrays.asList(new Object[][]{{4, TestingNetconfClientRunnable.class,
103                 NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES},
104             {1, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES},
105             // empty set of capabilities = only base 1.0 netconf capability
106             {4, TestingNetconfClientRunnable.class, Collections.emptySet()},
107             {4, TestingNetconfClientRunnable.class, getOnlyExiServerCaps()},
108             {4, TestingNetconfClientRunnable.class, getOnlyChunkServerCaps()},
109             {4, BlockingClientRunnable.class, getOnlyExiServerCaps()},
110             {1, BlockingClientRunnable.class, getOnlyExiServerCaps()},
111         });
112     }
113
114     private EventLoopGroup nettyGroup;
115     private NetconfClientDispatcher netconfClientDispatcher;
116
117     HashedWheelTimer hashedWheelTimer;
118     private TestingNetconfOperation testingNetconfOperation;
119
120     public static NetconfMonitoringService createMockedMonitoringService() {
121         NetconfMonitoringService monitoring = mock(NetconfMonitoringService.class);
122         final SessionListener sessionListener = mock(SessionListener.class);
123         doNothing().when(sessionListener).onSessionUp(any(NetconfServerSession.class));
124         doNothing().when(sessionListener).onSessionDown(any(NetconfServerSession.class));
125         doNothing().when(sessionListener).onSessionEvent(any(SessionEvent.class));
126         doReturn((AutoCloseable) () -> {
127
128         }).when(monitoring).registerCapabilitiesListener(any(NetconfMonitoringService.CapabilitiesListener.class));
129         doReturn(sessionListener).when(monitoring).getSessionListener();
130         doReturn(new CapabilitiesBuilder().setCapability(Collections.emptyList()).build()).when(monitoring)
131                 .getCapabilities();
132         return monitoring;
133     }
134
135     @BeforeClass
136     public static void setUpClientExecutor() {
137         clientExecutor = Executors.newFixedThreadPool(CONCURRENCY, new ThreadFactory() {
138             int index = 1;
139
140             @Override
141             public Thread newThread(final Runnable runnable) {
142                 Thread thread = new Thread(runnable);
143                 thread.setName("client-" + index++);
144                 thread.setDaemon(true);
145                 return thread;
146             }
147         });
148     }
149
150     @Before
151     public void setUp() throws Exception {
152         hashedWheelTimer = new HashedWheelTimer();
153         nettyGroup = new NioEventLoopGroup(nettyThreads);
154         netconfClientDispatcher = new NetconfClientDispatcherImpl(nettyGroup, nettyGroup, hashedWheelTimer);
155
156         AggregatedNetconfOperationServiceFactory factoriesListener = new AggregatedNetconfOperationServiceFactory();
157
158         testingNetconfOperation = new TestingNetconfOperation();
159         factoriesListener.onAddNetconfOperationServiceFactory(
160                 new TestingOperationServiceFactory(testingNetconfOperation));
161
162         SessionIdProvider idProvider = new SessionIdProvider();
163
164         NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new
165                 NetconfServerSessionNegotiatorFactoryBuilder()
166                 .setTimer(hashedWheelTimer)
167                 .setAggregatedOpService(factoriesListener)
168                 .setIdProvider(idProvider)
169                 .setConnectionTimeoutMillis(5000)
170                 .setMonitoringService(createMockedMonitoringService())
171                 .setBaseCapabilities(serverCaps)
172                 .build();
173
174         NetconfServerDispatcherImpl.ServerChannelInitializer serverChannelInitializer =
175                 new NetconfServerDispatcherImpl.ServerChannelInitializer(serverNegotiatorFactory);
176         final NetconfServerDispatcherImpl dispatch =
177                 new NetconfServerDispatcherImpl(serverChannelInitializer, nettyGroup, nettyGroup);
178
179         ChannelFuture server = dispatch.createServer(NETCONF_ADDRESS);
180         server.await();
181     }
182
183     @After
184     public void tearDown() {
185         hashedWheelTimer.stop();
186         try {
187             nettyGroup.shutdownGracefully().get();
188         } catch (InterruptedException | ExecutionException e) {
189             LOG.warn("Ignoring exception while cleaning up after test", e);
190         }
191     }
192
193     @AfterClass
194     public static void tearDownClientExecutor() {
195         clientExecutor.shutdownNow();
196     }
197
198     @Test(timeout = CONCURRENCY * 1000)
199     public void testConcurrentClients() throws Exception {
200
201         List<Future<?>> futures = Lists.newArrayListWithCapacity(CONCURRENCY);
202
203         for (int i = 0; i < CONCURRENCY; i++) {
204             futures.add(clientExecutor.submit(getInstanceOfClientRunnable()));
205         }
206
207         for (Future<?> future : futures) {
208             try {
209                 future.get();
210             } catch (InterruptedException e) {
211                 throw new IllegalStateException(e);
212             } catch (ExecutionException e) {
213                 LOG.error("Thread for testing client failed", e);
214                 throw e;
215             }
216         }
217
218         assertEquals(CONCURRENCY, testingNetconfOperation.getMessageCount());
219     }
220
221     public static Set<String> getOnlyExiServerCaps() {
222         return Sets.newHashSet(
223                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
224                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0
225         );
226     }
227
228     public static Set<String> getOnlyChunkServerCaps() {
229         return Sets.newHashSet(
230                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
231                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1
232         );
233     }
234
235     public Runnable getInstanceOfClientRunnable() throws Exception {
236         return clientRunnable.getConstructor(ConcurrentClientsTest.class).newInstance(this);
237     }
238
239     /**
240      * Responds to all operations except start-exi and counts all requests.
241      */
242     private static class TestingNetconfOperation implements NetconfOperation {
243
244         private final AtomicLong counter = new AtomicLong();
245
246         @Override
247         public HandlingPriority canHandle(Document message) {
248             return XmlUtil.toString(message).contains(NetconfStartExiMessage.START_EXI)
249                     ? HandlingPriority.CANNOT_HANDLE :
250                     HandlingPriority.HANDLE_WITH_MAX_PRIORITY;
251         }
252
253         @SuppressWarnings("checkstyle:IllegalCatch")
254         @Override
255         public Document handle(Document requestMessage, NetconfOperationChainedExecution subsequentOperation)
256                 throws DocumentedException {
257             try {
258                 LOG.info("Handling netconf message from test {}", XmlUtil.toString(requestMessage));
259                 counter.getAndIncrement();
260                 return XmlUtil.readXmlToDocument("<test/>");
261             } catch (Exception e) {
262                 throw new RuntimeException(e);
263             }
264         }
265
266         public long getMessageCount() {
267             return counter.get();
268         }
269     }
270
271     /**
272      * Hardcoded operation service factory.
273      */
274     private static class TestingOperationServiceFactory implements NetconfOperationServiceFactory {
275         private final NetconfOperation[] operations;
276
277         TestingOperationServiceFactory(final NetconfOperation... operations) {
278             this.operations = operations;
279         }
280
281         @Override
282         public Set<Capability> getCapabilities() {
283             return Collections.emptySet();
284         }
285
286         @Override
287         public AutoCloseable registerCapabilityListener(final CapabilityListener listener) {
288             return () -> {
289             };
290         }
291
292         @Override
293         public NetconfOperationService createService(String netconfSessionIdForReporting) {
294             return new NetconfOperationService() {
295
296                 @Override
297                 public Set<NetconfOperation> getNetconfOperations() {
298                     return Sets.newHashSet(operations);
299                 }
300
301                 @Override
302                 public void close() {
303                 }
304             };
305         }
306     }
307
308     /**
309      * Pure socket based blocking client.
310      */
311     @SuppressWarnings("checkstyle:IllegalCatch")
312     public final class BlockingClientRunnable implements Runnable {
313
314         @Override
315         public void run() {
316             try {
317                 run2();
318             } catch (Exception e) {
319                 throw new IllegalStateException(Thread.currentThread().getName(), e);
320             }
321         }
322
323         private void run2() throws Exception {
324             InputStream clientHello = checkNotNull(XmlFileLoader
325                     .getResourceAsStream("netconfMessages/client_hello.xml"));
326             final InputStream getConfig =
327                     checkNotNull(XmlFileLoader.getResourceAsStream("netconfMessages/getConfig.xml"));
328
329             Socket clientSocket = new Socket(NETCONF_ADDRESS.getHostString(), NETCONF_ADDRESS.getPort());
330             DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
331             InputStreamReader inFromServer = new InputStreamReader(clientSocket.getInputStream());
332
333             StringBuffer sb = new StringBuffer();
334             while (sb.toString().endsWith("]]>]]>") == false) {
335                 sb.append((char) inFromServer.read());
336             }
337             LOG.info(sb.toString());
338
339             outToServer.write(ByteStreams.toByteArray(clientHello));
340             outToServer.write("]]>]]>".getBytes());
341             outToServer.flush();
342             // Thread.sleep(100);
343             outToServer.write(ByteStreams.toByteArray(getConfig));
344             outToServer.write("]]>]]>".getBytes());
345             outToServer.flush();
346             Thread.sleep(100);
347             sb = new StringBuffer();
348             while (sb.toString().endsWith("]]>]]>") == false) {
349                 sb.append((char) inFromServer.read());
350             }
351             LOG.info(sb.toString());
352             clientSocket.close();
353         }
354     }
355
356     /**
357      * TestingNetconfClient based runnable.
358      */
359     public final class TestingNetconfClientRunnable implements Runnable {
360
361         @SuppressWarnings("checkstyle:IllegalCatch")
362         @Override
363         public void run() {
364             try {
365                 final TestingNetconfClient netconfClient =
366                         new TestingNetconfClient(Thread.currentThread().getName(), netconfClientDispatcher,
367                                 getClientConfig());
368                 long sessionId = netconfClient.getSessionId();
369                 LOG.info("Client with session id {}: hello exchanged", sessionId);
370
371                 final NetconfMessage getMessage = XmlFileLoader
372                         .xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
373                 NetconfMessage result = netconfClient.sendRequest(getMessage).get();
374                 LOG.info("Client with session id {}: got result {}", sessionId, result);
375
376                 Preconditions.checkState(NetconfMessageUtil.isErrorMessage(result) == false,
377                         "Received error response: " + XmlUtil.toString(result.getDocument()) + " to request: "
378                                 + XmlUtil.toString(getMessage.getDocument()));
379
380                 netconfClient.close();
381                 LOG.info("Client with session id {}: ended", sessionId);
382             } catch (final Exception e) {
383                 throw new IllegalStateException(Thread.currentThread().getName(), e);
384             }
385         }
386
387         private NetconfClientConfiguration getClientConfig() {
388             final NetconfClientConfigurationBuilder b = NetconfClientConfigurationBuilder.create();
389             b.withAddress(NETCONF_ADDRESS);
390             b.withAdditionalHeader(new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp",
391                     "client"));
392             b.withSessionListener(new SimpleNetconfClientSessionListener());
393             b.withReconnectStrategy(new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE,
394                     NetconfClientConfigurationBuilder.DEFAULT_CONNECTION_TIMEOUT_MILLIS));
395             return b.build();
396         }
397     }
398 }