Integrate netconf-mapping-api into netconf-server
[netconf.git] / protocol / netconf-server / src / test / java / org / opendaylight / netconf / server / ConcurrentClientsTest.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.server;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
12 import static org.junit.Assert.assertEquals;
13 import static org.mockito.ArgumentMatchers.any;
14 import static org.mockito.Mockito.doNothing;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.mock;
17
18 import com.google.common.collect.Sets;
19 import com.google.common.io.ByteStreams;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.EventLoopGroup;
22 import io.netty.channel.nio.NioEventLoopGroup;
23 import io.netty.util.HashedWheelTimer;
24 import io.netty.util.concurrent.GlobalEventExecutor;
25 import java.io.DataOutputStream;
26 import java.io.InputStream;
27 import java.io.InputStreamReader;
28 import java.net.InetSocketAddress;
29 import java.net.Socket;
30 import java.util.ArrayList;
31 import java.util.Arrays;
32 import java.util.Collection;
33 import java.util.Collections;
34 import java.util.List;
35 import java.util.Set;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.ExecutorService;
38 import java.util.concurrent.Executors;
39 import java.util.concurrent.Future;
40 import java.util.concurrent.ThreadFactory;
41 import java.util.concurrent.atomic.AtomicLong;
42 import org.junit.After;
43 import org.junit.AfterClass;
44 import org.junit.Before;
45 import org.junit.BeforeClass;
46 import org.junit.Test;
47 import org.junit.runner.RunWith;
48 import org.junit.runners.Parameterized;
49 import org.opendaylight.netconf.api.DocumentedException;
50 import org.opendaylight.netconf.api.NetconfMessage;
51 import org.opendaylight.netconf.api.capability.Capability;
52 import org.opendaylight.netconf.api.messages.NetconfHelloMessageAdditionalHeader;
53 import org.opendaylight.netconf.api.monitoring.CapabilityListener;
54 import org.opendaylight.netconf.api.monitoring.NetconfMonitoringService;
55 import org.opendaylight.netconf.api.monitoring.SessionEvent;
56 import org.opendaylight.netconf.api.monitoring.SessionListener;
57 import org.opendaylight.netconf.api.xml.XmlNetconfConstants;
58 import org.opendaylight.netconf.api.xml.XmlUtil;
59 import org.opendaylight.netconf.client.NetconfClientDispatcher;
60 import org.opendaylight.netconf.client.NetconfClientDispatcherImpl;
61 import org.opendaylight.netconf.client.NetconfMessageUtil;
62 import org.opendaylight.netconf.client.SimpleNetconfClientSessionListener;
63 import org.opendaylight.netconf.client.TestingNetconfClient;
64 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
65 import org.opendaylight.netconf.client.conf.NetconfClientConfigurationBuilder;
66 import org.opendaylight.netconf.nettyutil.NeverReconnectStrategy;
67 import org.opendaylight.netconf.nettyutil.handler.exi.NetconfStartExiMessage;
68 import org.opendaylight.netconf.server.api.SessionIdProvider;
69 import org.opendaylight.netconf.server.api.operations.HandlingPriority;
70 import org.opendaylight.netconf.server.api.operations.NetconfOperation;
71 import org.opendaylight.netconf.server.api.operations.NetconfOperationChainedExecution;
72 import org.opendaylight.netconf.server.api.operations.NetconfOperationService;
73 import org.opendaylight.netconf.server.api.operations.NetconfOperationServiceFactory;
74 import org.opendaylight.netconf.server.impl.DefaultSessionIdProvider;
75 import org.opendaylight.netconf.server.osgi.AggregatedNetconfOperationServiceFactory;
76 import org.opendaylight.netconf.util.test.XmlFileLoader;
77 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.CapabilitiesBuilder;
78 import org.opendaylight.yangtools.concepts.Registration;
79 import org.slf4j.Logger;
80 import org.slf4j.LoggerFactory;
81 import org.w3c.dom.Document;
82
83 @RunWith(Parameterized.class)
84 public class ConcurrentClientsTest {
85     private static final Logger LOG = LoggerFactory.getLogger(ConcurrentClientsTest.class);
86
87     private static ExecutorService clientExecutor;
88
89     private static final int CONCURRENCY = 32;
90     private static final InetSocketAddress NETCONF_ADDRESS = new InetSocketAddress("127.0.0.1", 8303);
91
92     private final int nettyThreads;
93     private final Class<? extends Runnable> clientRunnable;
94     private final Set<String> serverCaps;
95
96     public ConcurrentClientsTest(final int nettyThreads, final Class<? extends Runnable> clientRunnable,
97             final Set<String> serverCaps) {
98         this.nettyThreads = nettyThreads;
99         this.clientRunnable = clientRunnable;
100         this.serverCaps = serverCaps;
101     }
102
103     @Parameterized.Parameters()
104     public static Collection<Object[]> data() {
105         return Arrays.asList(new Object[][]{
106             { 4, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES},
107             { 1, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES},
108             // empty set of capabilities = only base 1.0 netconf capability
109             { 4, TestingNetconfClientRunnable.class, Collections.emptySet()},
110             { 4, TestingNetconfClientRunnable.class, getOnlyExiServerCaps()},
111             { 4, TestingNetconfClientRunnable.class, getOnlyChunkServerCaps()},
112             { 4, BlockingClientRunnable.class, getOnlyExiServerCaps()},
113             { 1, BlockingClientRunnable.class, getOnlyExiServerCaps()},
114         });
115     }
116
117     private EventLoopGroup nettyGroup;
118     private NetconfClientDispatcher netconfClientDispatcher;
119
120     HashedWheelTimer hashedWheelTimer;
121     private TestingNetconfOperation testingNetconfOperation;
122
123     public static NetconfMonitoringService createMockedMonitoringService() {
124         NetconfMonitoringService monitoring = mock(NetconfMonitoringService.class);
125         final SessionListener sessionListener = mock(SessionListener.class);
126         doNothing().when(sessionListener).onSessionUp(any(NetconfServerSession.class));
127         doNothing().when(sessionListener).onSessionDown(any(NetconfServerSession.class));
128         doNothing().when(sessionListener).onSessionEvent(any(SessionEvent.class));
129         doReturn((Registration) () -> { }).when(monitoring)
130             .registerCapabilitiesListener(any(NetconfMonitoringService.CapabilitiesListener.class));
131         doReturn(sessionListener).when(monitoring).getSessionListener();
132         doReturn(new CapabilitiesBuilder().setCapability(Set.of()).build()).when(monitoring).getCapabilities();
133         return monitoring;
134     }
135
136     @BeforeClass
137     public static void setUpClientExecutor() {
138         clientExecutor = Executors.newFixedThreadPool(CONCURRENCY, new ThreadFactory() {
139             int index = 1;
140
141             @Override
142             public Thread newThread(final Runnable runnable) {
143                 Thread thread = new Thread(runnable);
144                 thread.setName("client-" + index++);
145                 thread.setDaemon(true);
146                 return thread;
147             }
148         });
149     }
150
151     @Before
152     public void setUp() throws Exception {
153         hashedWheelTimer = new HashedWheelTimer();
154         nettyGroup = new NioEventLoopGroup(nettyThreads);
155         netconfClientDispatcher = new NetconfClientDispatcherImpl(nettyGroup, nettyGroup, hashedWheelTimer);
156
157         AggregatedNetconfOperationServiceFactory factoriesListener = new AggregatedNetconfOperationServiceFactory();
158
159         testingNetconfOperation = new TestingNetconfOperation();
160         factoriesListener.onAddNetconfOperationServiceFactory(
161                 new TestingOperationServiceFactory(testingNetconfOperation));
162
163         SessionIdProvider idProvider = new DefaultSessionIdProvider();
164
165         NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new
166                 NetconfServerSessionNegotiatorFactoryBuilder()
167                 .setTimer(hashedWheelTimer)
168                 .setAggregatedOpService(factoriesListener)
169                 .setIdProvider(idProvider)
170                 .setConnectionTimeoutMillis(5000)
171                 .setMonitoringService(createMockedMonitoringService())
172                 .setBaseCapabilities(serverCaps)
173                 .build();
174
175         ServerChannelInitializer serverChannelInitializer =
176                 new ServerChannelInitializer(serverNegotiatorFactory);
177         final NetconfServerDispatcherImpl dispatch =
178                 new NetconfServerDispatcherImpl(serverChannelInitializer, nettyGroup, nettyGroup);
179
180         ChannelFuture server = dispatch.createServer(NETCONF_ADDRESS);
181         server.await();
182     }
183
184     @After
185     public void tearDown() {
186         hashedWheelTimer.stop();
187         try {
188             nettyGroup.shutdownGracefully().get();
189         } catch (InterruptedException | ExecutionException e) {
190             LOG.warn("Ignoring exception while cleaning up after test", e);
191         }
192     }
193
194     @AfterClass
195     public static void tearDownClientExecutor() {
196         clientExecutor.shutdownNow();
197     }
198
199     @Test(timeout = CONCURRENCY * 1000)
200     public void testConcurrentClients() throws Exception {
201
202         List<Future<?>> futures = new ArrayList<>(CONCURRENCY);
203
204         for (int i = 0; i < CONCURRENCY; i++) {
205             futures.add(clientExecutor.submit(getInstanceOfClientRunnable()));
206         }
207
208         for (Future<?> future : futures) {
209             try {
210                 future.get();
211             } catch (InterruptedException e) {
212                 throw new IllegalStateException(e);
213             } catch (ExecutionException e) {
214                 LOG.error("Thread for testing client failed", e);
215                 throw e;
216             }
217         }
218
219         assertEquals(CONCURRENCY, testingNetconfOperation.getMessageCount());
220     }
221
222     public static Set<String> getOnlyExiServerCaps() {
223         return Sets.newHashSet(
224                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
225                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0
226         );
227     }
228
229     public static Set<String> getOnlyChunkServerCaps() {
230         return Sets.newHashSet(
231                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
232                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1
233         );
234     }
235
236     public Runnable getInstanceOfClientRunnable() throws Exception {
237         return clientRunnable.getConstructor(ConcurrentClientsTest.class).newInstance(this);
238     }
239
240     /**
241      * Responds to all operations except start-exi and counts all requests.
242      */
243     private static class TestingNetconfOperation implements NetconfOperation {
244
245         private final AtomicLong counter = new AtomicLong();
246
247         @Override
248         public HandlingPriority canHandle(final Document message) {
249             return XmlUtil.toString(message).contains(NetconfStartExiMessage.START_EXI)
250                     ? HandlingPriority.CANNOT_HANDLE :
251                     HandlingPriority.HANDLE_WITH_MAX_PRIORITY;
252         }
253
254         @SuppressWarnings("checkstyle:IllegalCatch")
255         @Override
256         public Document handle(final Document requestMessage,
257                 final NetconfOperationChainedExecution subsequentOperation) throws DocumentedException {
258             try {
259                 LOG.info("Handling netconf message from test {}", XmlUtil.toString(requestMessage));
260                 counter.getAndIncrement();
261                 return XmlUtil.readXmlToDocument("<test/>");
262             } catch (Exception e) {
263                 throw new RuntimeException(e);
264             }
265         }
266
267         public long getMessageCount() {
268             return counter.get();
269         }
270     }
271
272     /**
273      * Hardcoded operation service factory.
274      */
275     private static class TestingOperationServiceFactory implements NetconfOperationServiceFactory {
276         private final NetconfOperation[] operations;
277
278         TestingOperationServiceFactory(final NetconfOperation... operations) {
279             this.operations = operations;
280         }
281
282         @Override
283         public Set<Capability> getCapabilities() {
284             return Collections.emptySet();
285         }
286
287         @Override
288         public Registration registerCapabilityListener(final CapabilityListener listener) {
289             // No-op
290             return () -> { };
291         }
292
293         @Override
294         public NetconfOperationService createService(final String netconfSessionIdForReporting) {
295             return new NetconfOperationService() {
296
297                 @Override
298                 public Set<NetconfOperation> getNetconfOperations() {
299                     return Sets.newHashSet(operations);
300                 }
301
302                 @Override
303                 public void close() {
304                 }
305             };
306         }
307     }
308
309     /**
310      * Pure socket based blocking client.
311      */
312     public final class BlockingClientRunnable implements Runnable {
313
314         @Override
315         @SuppressWarnings("checkstyle:IllegalCatch")
316         public void run() {
317             try {
318                 run2();
319             } catch (Exception e) {
320                 throw new IllegalStateException(Thread.currentThread().getName(), e);
321             }
322         }
323
324         private void run2() throws Exception {
325             InputStream clientHello = requireNonNull(XmlFileLoader.getResourceAsStream(
326                 "netconfMessages/client_hello.xml"));
327             final InputStream getConfig = requireNonNull(XmlFileLoader.getResourceAsStream(
328                 "netconfMessages/getConfig.xml"));
329
330             Socket clientSocket = new Socket(NETCONF_ADDRESS.getHostString(), NETCONF_ADDRESS.getPort());
331             DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
332             InputStreamReader inFromServer = new InputStreamReader(clientSocket.getInputStream());
333
334             StringBuilder sb = new StringBuilder();
335             while (!sb.toString().endsWith("]]>]]>")) {
336                 sb.append((char) inFromServer.read());
337             }
338             LOG.info(sb.toString());
339
340             outToServer.write(ByteStreams.toByteArray(clientHello));
341             outToServer.write("]]>]]>".getBytes());
342             outToServer.flush();
343             // Thread.sleep(100);
344             outToServer.write(ByteStreams.toByteArray(getConfig));
345             outToServer.write("]]>]]>".getBytes());
346             outToServer.flush();
347             Thread.sleep(100);
348             sb = new StringBuilder();
349             while (!sb.toString().endsWith("]]>]]>")) {
350                 sb.append((char) inFromServer.read());
351             }
352             LOG.info(sb.toString());
353             clientSocket.close();
354         }
355     }
356
357     /**
358      * TestingNetconfClient based runnable.
359      */
360     public final class TestingNetconfClientRunnable implements Runnable {
361
362         @SuppressWarnings("checkstyle:IllegalCatch")
363         @Override
364         public void run() {
365             try {
366                 final TestingNetconfClient netconfClient =
367                         new TestingNetconfClient(Thread.currentThread().getName(), netconfClientDispatcher,
368                                 getClientConfig());
369                 long sessionId = netconfClient.getSessionId();
370                 LOG.info("Client with session id {}: hello exchanged", sessionId);
371
372                 final NetconfMessage getMessage = XmlFileLoader
373                         .xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
374                 NetconfMessage result = netconfClient.sendRequest(getMessage).get();
375                 LOG.info("Client with session id {}: got result {}", sessionId, result);
376
377                 checkState(NetconfMessageUtil.isErrorMessage(result) == false,
378                         "Received error response: " + XmlUtil.toString(result.getDocument()) + " to request: "
379                                 + XmlUtil.toString(getMessage.getDocument()));
380
381                 netconfClient.close();
382                 LOG.info("Client with session id {}: ended", sessionId);
383             } catch (final Exception e) {
384                 throw new IllegalStateException(Thread.currentThread().getName(), e);
385             }
386         }
387
388         private NetconfClientConfiguration getClientConfig() {
389             final NetconfClientConfigurationBuilder b = NetconfClientConfigurationBuilder.create();
390             b.withAddress(NETCONF_ADDRESS);
391             b.withAdditionalHeader(new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp",
392                     "client"));
393             b.withSessionListener(new SimpleNetconfClientSessionListener());
394             b.withReconnectStrategy(new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE,
395                     NetconfClientConfigurationBuilder.DEFAULT_CONNECTION_TIMEOUT_MILLIS));
396             return b.build();
397         }
398     }
399 }