983bca0a2d504c463bf54f05c2adf41e682c6495
[netconf.git] / protocol / netconf-server / src / test / java / org / opendaylight / netconf / server / ConcurrentClientsTest.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.server;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
12 import static org.junit.Assert.assertEquals;
13 import static org.mockito.ArgumentMatchers.any;
14 import static org.mockito.Mockito.doNothing;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.mock;
17
18 import com.google.common.io.ByteStreams;
19 import io.netty.channel.ChannelFuture;
20 import io.netty.channel.EventLoopGroup;
21 import io.netty.channel.nio.NioEventLoopGroup;
22 import io.netty.util.HashedWheelTimer;
23 import io.netty.util.concurrent.GlobalEventExecutor;
24 import java.io.DataOutputStream;
25 import java.io.InputStream;
26 import java.io.InputStreamReader;
27 import java.net.InetSocketAddress;
28 import java.net.Socket;
29 import java.util.ArrayList;
30 import java.util.Collection;
31 import java.util.List;
32 import java.util.Set;
33 import java.util.concurrent.ExecutionException;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.Future;
37 import java.util.concurrent.ThreadFactory;
38 import java.util.concurrent.atomic.AtomicLong;
39 import org.junit.After;
40 import org.junit.AfterClass;
41 import org.junit.Before;
42 import org.junit.BeforeClass;
43 import org.junit.Test;
44 import org.junit.runner.RunWith;
45 import org.junit.runners.Parameterized;
46 import org.opendaylight.netconf.api.CapabilityURN;
47 import org.opendaylight.netconf.api.DocumentedException;
48 import org.opendaylight.netconf.api.NetconfMessage;
49 import org.opendaylight.netconf.api.messages.NetconfHelloMessageAdditionalHeader;
50 import org.opendaylight.netconf.api.xml.XmlUtil;
51 import org.opendaylight.netconf.client.NetconfClientDispatcher;
52 import org.opendaylight.netconf.client.NetconfClientDispatcherImpl;
53 import org.opendaylight.netconf.client.NetconfMessageUtil;
54 import org.opendaylight.netconf.client.SimpleNetconfClientSessionListener;
55 import org.opendaylight.netconf.client.TestingNetconfClient;
56 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
57 import org.opendaylight.netconf.client.conf.NetconfClientConfigurationBuilder;
58 import org.opendaylight.netconf.nettyutil.NeverReconnectStrategy;
59 import org.opendaylight.netconf.nettyutil.handler.exi.NetconfStartExiMessage;
60 import org.opendaylight.netconf.server.api.SessionIdProvider;
61 import org.opendaylight.netconf.server.api.monitoring.Capability;
62 import org.opendaylight.netconf.server.api.monitoring.CapabilityListener;
63 import org.opendaylight.netconf.server.api.monitoring.NetconfMonitoringService;
64 import org.opendaylight.netconf.server.api.monitoring.SessionEvent;
65 import org.opendaylight.netconf.server.api.monitoring.SessionListener;
66 import org.opendaylight.netconf.server.api.operations.HandlingPriority;
67 import org.opendaylight.netconf.server.api.operations.NetconfOperation;
68 import org.opendaylight.netconf.server.api.operations.NetconfOperationChainedExecution;
69 import org.opendaylight.netconf.server.api.operations.NetconfOperationService;
70 import org.opendaylight.netconf.server.api.operations.NetconfOperationServiceFactory;
71 import org.opendaylight.netconf.server.impl.DefaultSessionIdProvider;
72 import org.opendaylight.netconf.server.osgi.AggregatedNetconfOperationServiceFactory;
73 import org.opendaylight.netconf.test.util.XmlFileLoader;
74 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.SessionIdType;
75 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.CapabilitiesBuilder;
76 import org.opendaylight.yangtools.concepts.Registration;
77 import org.slf4j.Logger;
78 import org.slf4j.LoggerFactory;
79 import org.w3c.dom.Document;
80
81 @RunWith(Parameterized.class)
82 public class ConcurrentClientsTest {
83     private static final Logger LOG = LoggerFactory.getLogger(ConcurrentClientsTest.class);
84
85     private static ExecutorService clientExecutor;
86
87     private static final int CONCURRENCY = 32;
88     private static final InetSocketAddress NETCONF_ADDRESS = new InetSocketAddress("127.0.0.1", 8303);
89
90     private final int nettyThreads;
91     private final Class<? extends Runnable> clientRunnable;
92     private final Set<String> serverCaps;
93
94     public ConcurrentClientsTest(final int nettyThreads, final Class<? extends Runnable> clientRunnable,
95             final Set<String> serverCaps) {
96         this.nettyThreads = nettyThreads;
97         this.clientRunnable = clientRunnable;
98         this.serverCaps = serverCaps;
99     }
100
101     @Parameterized.Parameters()
102     public static Collection<Object[]> data() {
103         return List.of(new Object[][]{
104             { 4, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES},
105             { 1, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES},
106             // empty set of capabilities = only base 1.0 netconf capability
107             { 4, TestingNetconfClientRunnable.class, Set.of()},
108             { 4, TestingNetconfClientRunnable.class, getOnlyExiServerCaps()},
109             { 4, TestingNetconfClientRunnable.class, getOnlyChunkServerCaps()},
110             { 4, BlockingClientRunnable.class, getOnlyExiServerCaps()},
111             { 1, BlockingClientRunnable.class, getOnlyExiServerCaps()},
112         });
113     }
114
115     private EventLoopGroup nettyGroup;
116     private NetconfClientDispatcher netconfClientDispatcher;
117
118     HashedWheelTimer hashedWheelTimer;
119     private TestingNetconfOperation testingNetconfOperation;
120
121     public static NetconfMonitoringService createMockedMonitoringService() {
122         NetconfMonitoringService monitoring = mock(NetconfMonitoringService.class);
123         final SessionListener sessionListener = mock(SessionListener.class);
124         doNothing().when(sessionListener).onSessionUp(any(NetconfServerSession.class));
125         doNothing().when(sessionListener).onSessionDown(any(NetconfServerSession.class));
126         doNothing().when(sessionListener).onSessionEvent(any(SessionEvent.class));
127         doReturn((Registration) () -> { }).when(monitoring)
128             .registerCapabilitiesListener(any(NetconfMonitoringService.CapabilitiesListener.class));
129         doReturn(sessionListener).when(monitoring).getSessionListener();
130         doReturn(new CapabilitiesBuilder().setCapability(Set.of()).build()).when(monitoring).getCapabilities();
131         return monitoring;
132     }
133
134     @BeforeClass
135     public static void setUpClientExecutor() {
136         clientExecutor = Executors.newFixedThreadPool(CONCURRENCY, new ThreadFactory() {
137             int index = 1;
138
139             @Override
140             public Thread newThread(final Runnable runnable) {
141                 Thread thread = new Thread(runnable);
142                 thread.setName("client-" + index++);
143                 thread.setDaemon(true);
144                 return thread;
145             }
146         });
147     }
148
149     @Before
150     public void setUp() throws Exception {
151         hashedWheelTimer = new HashedWheelTimer();
152         nettyGroup = new NioEventLoopGroup(nettyThreads);
153         netconfClientDispatcher = new NetconfClientDispatcherImpl(nettyGroup, nettyGroup, hashedWheelTimer);
154
155         AggregatedNetconfOperationServiceFactory factoriesListener = new AggregatedNetconfOperationServiceFactory();
156
157         testingNetconfOperation = new TestingNetconfOperation();
158         factoriesListener.onAddNetconfOperationServiceFactory(
159                 new TestingOperationServiceFactory(testingNetconfOperation));
160
161         SessionIdProvider idProvider = new DefaultSessionIdProvider();
162
163         NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new
164                 NetconfServerSessionNegotiatorFactoryBuilder()
165                 .setTimer(hashedWheelTimer)
166                 .setAggregatedOpService(factoriesListener)
167                 .setIdProvider(idProvider)
168                 .setConnectionTimeoutMillis(5000)
169                 .setMonitoringService(createMockedMonitoringService())
170                 .setBaseCapabilities(serverCaps)
171                 .build();
172
173         ServerChannelInitializer serverChannelInitializer =
174                 new ServerChannelInitializer(serverNegotiatorFactory);
175         final NetconfServerDispatcherImpl dispatch =
176                 new NetconfServerDispatcherImpl(serverChannelInitializer, nettyGroup, nettyGroup);
177
178         ChannelFuture server = dispatch.createServer(NETCONF_ADDRESS);
179         server.await();
180     }
181
182     @After
183     public void tearDown() {
184         hashedWheelTimer.stop();
185         try {
186             nettyGroup.shutdownGracefully().get();
187         } catch (InterruptedException | ExecutionException e) {
188             LOG.warn("Ignoring exception while cleaning up after test", e);
189         }
190     }
191
192     @AfterClass
193     public static void tearDownClientExecutor() {
194         clientExecutor.shutdownNow();
195     }
196
197     @Test(timeout = CONCURRENCY * 1000)
198     public void testConcurrentClients() throws Exception {
199
200         List<Future<?>> futures = new ArrayList<>(CONCURRENCY);
201
202         for (int i = 0; i < CONCURRENCY; i++) {
203             futures.add(clientExecutor.submit(getInstanceOfClientRunnable()));
204         }
205
206         for (Future<?> future : futures) {
207             try {
208                 future.get();
209             } catch (InterruptedException e) {
210                 throw new IllegalStateException(e);
211             } catch (ExecutionException e) {
212                 LOG.error("Thread for testing client failed", e);
213                 throw e;
214             }
215         }
216
217         assertEquals(CONCURRENCY, testingNetconfOperation.getMessageCount());
218     }
219
220     public static Set<String> getOnlyExiServerCaps() {
221         return Set.of(CapabilityURN.BASE, CapabilityURN.EXI);
222     }
223
224     public static Set<String> getOnlyChunkServerCaps() {
225         return Set.of(CapabilityURN.BASE, CapabilityURN.BASE_1_1);
226     }
227
228     public Runnable getInstanceOfClientRunnable() throws Exception {
229         return clientRunnable.getConstructor(ConcurrentClientsTest.class).newInstance(this);
230     }
231
232     /**
233      * Responds to all operations except start-exi and counts all requests.
234      */
235     private static class TestingNetconfOperation implements NetconfOperation {
236
237         private final AtomicLong counter = new AtomicLong();
238
239         @Override
240         public HandlingPriority canHandle(final Document message) {
241             return XmlUtil.toString(message).contains(NetconfStartExiMessage.START_EXI)
242                     ? HandlingPriority.CANNOT_HANDLE :
243                     HandlingPriority.HANDLE_WITH_MAX_PRIORITY;
244         }
245
246         @SuppressWarnings("checkstyle:IllegalCatch")
247         @Override
248         public Document handle(final Document requestMessage,
249                 final NetconfOperationChainedExecution subsequentOperation) throws DocumentedException {
250             try {
251                 LOG.info("Handling netconf message from test {}", XmlUtil.toString(requestMessage));
252                 counter.getAndIncrement();
253                 return XmlUtil.readXmlToDocument("<test/>");
254             } catch (Exception e) {
255                 throw new RuntimeException(e);
256             }
257         }
258
259         public long getMessageCount() {
260             return counter.get();
261         }
262     }
263
264     /**
265      * Hardcoded operation service factory.
266      */
267     private static class TestingOperationServiceFactory implements NetconfOperationServiceFactory {
268         private final NetconfOperation[] operations;
269
270         TestingOperationServiceFactory(final NetconfOperation... operations) {
271             this.operations = operations;
272         }
273
274         @Override
275         public Set<Capability> getCapabilities() {
276             return Set.of();
277         }
278
279         @Override
280         public Registration registerCapabilityListener(final CapabilityListener listener) {
281             // No-op
282             return () -> { };
283         }
284
285         @Override
286         public NetconfOperationService createService(final SessionIdType sessionId) {
287             return new NetconfOperationService() {
288
289                 @Override
290                 public Set<NetconfOperation> getNetconfOperations() {
291                     return Set.of(operations);
292                 }
293
294                 @Override
295                 public void close() {
296                 }
297             };
298         }
299     }
300
301     /**
302      * Pure socket based blocking client.
303      */
304     public final class BlockingClientRunnable implements Runnable {
305
306         @Override
307         @SuppressWarnings("checkstyle:IllegalCatch")
308         public void run() {
309             try {
310                 run2();
311             } catch (Exception e) {
312                 throw new IllegalStateException(Thread.currentThread().getName(), e);
313             }
314         }
315
316         private void run2() throws Exception {
317             InputStream clientHello = requireNonNull(XmlFileLoader.getResourceAsStream(
318                 "netconfMessages/client_hello.xml"));
319             final InputStream getConfig = requireNonNull(XmlFileLoader.getResourceAsStream(
320                 "netconfMessages/getConfig.xml"));
321
322             Socket clientSocket = new Socket(NETCONF_ADDRESS.getHostString(), NETCONF_ADDRESS.getPort());
323             DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
324             InputStreamReader inFromServer = new InputStreamReader(clientSocket.getInputStream());
325
326             StringBuilder sb = new StringBuilder();
327             while (!sb.toString().endsWith("]]>]]>")) {
328                 sb.append((char) inFromServer.read());
329             }
330             LOG.info(sb.toString());
331
332             outToServer.write(ByteStreams.toByteArray(clientHello));
333             outToServer.write("]]>]]>".getBytes());
334             outToServer.flush();
335             // Thread.sleep(100);
336             outToServer.write(ByteStreams.toByteArray(getConfig));
337             outToServer.write("]]>]]>".getBytes());
338             outToServer.flush();
339             Thread.sleep(100);
340             sb = new StringBuilder();
341             while (!sb.toString().endsWith("]]>]]>")) {
342                 sb.append((char) inFromServer.read());
343             }
344             LOG.info(sb.toString());
345             clientSocket.close();
346         }
347     }
348
349     /**
350      * TestingNetconfClient based runnable.
351      */
352     public final class TestingNetconfClientRunnable implements Runnable {
353
354         @SuppressWarnings("checkstyle:IllegalCatch")
355         @Override
356         public void run() {
357             try {
358                 final TestingNetconfClient netconfClient =
359                         new TestingNetconfClient(Thread.currentThread().getName(), netconfClientDispatcher,
360                                 getClientConfig());
361                 final var sessionId = netconfClient.sessionId();
362                 LOG.info("Client with session id {}: hello exchanged", sessionId);
363
364                 final NetconfMessage getMessage = XmlFileLoader
365                         .xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
366                 NetconfMessage result = netconfClient.sendRequest(getMessage).get();
367                 LOG.info("Client with session id {}: got result {}", sessionId.getValue(), result);
368
369                 checkState(NetconfMessageUtil.isErrorMessage(result) == false,
370                         "Received error response: " + XmlUtil.toString(result.getDocument()) + " to request: "
371                                 + XmlUtil.toString(getMessage.getDocument()));
372
373                 netconfClient.close();
374                 LOG.info("Client with session id {}: ended", sessionId.getValue());
375             } catch (final Exception e) {
376                 throw new IllegalStateException(Thread.currentThread().getName(), e);
377             }
378         }
379
380         private NetconfClientConfiguration getClientConfig() {
381             final NetconfClientConfigurationBuilder b = NetconfClientConfigurationBuilder.create();
382             b.withAddress(NETCONF_ADDRESS);
383             b.withAdditionalHeader(new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp",
384                     "client"));
385             b.withSessionListener(new SimpleNetconfClientSessionListener());
386             b.withReconnectStrategy(new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE,
387                     NetconfClientConfigurationBuilder.DEFAULT_CONNECTION_TIMEOUT_MILLIS));
388             return b.build();
389         }
390     }
391 }