Introduce NetconfTimer
[netconf.git] / protocol / netconf-server / src / test / java / org / opendaylight / netconf / server / ConcurrentClientsTest.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.server;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
12 import static java.util.concurrent.TimeUnit.MILLISECONDS;
13 import static org.junit.jupiter.api.Assertions.assertEquals;
14 import static org.mockito.ArgumentMatchers.any;
15 import static org.mockito.Mockito.doNothing;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.lenient;
18 import static org.opendaylight.netconf.server.NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES;
19
20 import java.io.DataOutputStream;
21 import java.io.IOException;
22 import java.io.InputStreamReader;
23 import java.net.InetAddress;
24 import java.net.ServerSocket;
25 import java.net.Socket;
26 import java.nio.charset.StandardCharsets;
27 import java.util.ArrayList;
28 import java.util.List;
29 import java.util.Set;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.ThreadFactory;
34 import java.util.concurrent.atomic.AtomicLong;
35 import org.junit.jupiter.api.AfterAll;
36 import org.junit.jupiter.api.AfterEach;
37 import org.junit.jupiter.api.BeforeAll;
38 import org.junit.jupiter.api.Timeout;
39 import org.junit.jupiter.api.extension.ExtendWith;
40 import org.junit.jupiter.params.ParameterizedTest;
41 import org.junit.jupiter.params.provider.Arguments;
42 import org.junit.jupiter.params.provider.MethodSource;
43 import org.mockito.Mock;
44 import org.mockito.junit.jupiter.MockitoExtension;
45 import org.opendaylight.netconf.api.CapabilityURN;
46 import org.opendaylight.netconf.api.DocumentedException;
47 import org.opendaylight.netconf.api.messages.NetconfMessage;
48 import org.opendaylight.netconf.api.xml.XmlUtil;
49 import org.opendaylight.netconf.client.NetconfClientFactory;
50 import org.opendaylight.netconf.client.NetconfClientFactoryImpl;
51 import org.opendaylight.netconf.client.NetconfMessageUtil;
52 import org.opendaylight.netconf.client.SimpleNetconfClientSessionListener;
53 import org.opendaylight.netconf.client.conf.NetconfClientConfigurationBuilder;
54 import org.opendaylight.netconf.common.impl.DefaultNetconfTimer;
55 import org.opendaylight.netconf.nettyutil.handler.exi.NetconfStartExiMessageProvider;
56 import org.opendaylight.netconf.server.api.SessionIdProvider;
57 import org.opendaylight.netconf.server.api.monitoring.Capability;
58 import org.opendaylight.netconf.server.api.monitoring.CapabilityListener;
59 import org.opendaylight.netconf.server.api.monitoring.NetconfMonitoringService;
60 import org.opendaylight.netconf.server.api.monitoring.SessionEvent;
61 import org.opendaylight.netconf.server.api.monitoring.SessionListener;
62 import org.opendaylight.netconf.server.api.operations.HandlingPriority;
63 import org.opendaylight.netconf.server.api.operations.NetconfOperation;
64 import org.opendaylight.netconf.server.api.operations.NetconfOperationChainedExecution;
65 import org.opendaylight.netconf.server.api.operations.NetconfOperationService;
66 import org.opendaylight.netconf.server.api.operations.NetconfOperationServiceFactory;
67 import org.opendaylight.netconf.server.impl.DefaultSessionIdProvider;
68 import org.opendaylight.netconf.server.osgi.AggregatedNetconfOperationServiceFactory;
69 import org.opendaylight.netconf.test.util.XmlFileLoader;
70 import org.opendaylight.netconf.transport.ssh.SSHTransportStackFactory;
71 import org.opendaylight.netconf.transport.tcp.BootstrapFactory;
72 import org.opendaylight.netconf.transport.tcp.TCPServer;
73 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.SessionIdType;
74 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Host;
75 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IetfInetUtil;
76 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.PortNumber;
77 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.client.rev231228.netconf.client.initiate.stack.grouping.transport.ssh.ssh.TcpClientParametersBuilder;
78 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.Capabilities;
79 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.CapabilitiesBuilder;
80 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.server.rev231228.netconf.server.listen.stack.grouping.transport.ssh.ssh.TcpServerParametersBuilder;
81 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.tcp.client.rev231228.TcpClientGrouping;
82 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.tcp.server.rev231228.TcpServerGrouping;
83 import org.opendaylight.yangtools.concepts.Registration;
84 import org.opendaylight.yangtools.yang.common.Uint16;
85 import org.slf4j.Logger;
86 import org.slf4j.LoggerFactory;
87 import org.w3c.dom.Document;
88
89 @ExtendWith(MockitoExtension.class)
90 public class ConcurrentClientsTest {
91     private static final Logger LOG = LoggerFactory.getLogger(ConcurrentClientsTest.class);
92
93     private static final int CONCURRENCY = 32;
94     private static final long TIMEOUT = 5000L;
95     private static final Set<String> CAPS_EXI = Set.of(CapabilityURN.BASE, CapabilityURN.EXI);
96     private static final Set<String> CAPS_1_1 = Set.of(CapabilityURN.BASE, CapabilityURN.BASE_1_1);
97     private static final Capabilities EMPTY_CAPABILITIES = new CapabilitiesBuilder().setCapability(Set.of()).build();
98     private static final SessionIdProvider ID_PROVIDER = new DefaultSessionIdProvider();
99
100     private static ExecutorService clientExecutor;
101     private static InetAddress serverAddress;
102     private static int serverPort;
103     private static TcpServerGrouping serverParams;
104     private static TcpClientGrouping clientParams;
105     private static DefaultNetconfTimer timer;
106
107     private static NetconfMessage getConfigMessage;
108     private static NetconfMessage clientHelloMessage;
109
110     private BootstrapFactory serverBootstrapFactory;
111     private NetconfClientFactory clientFactory;
112     private TCPServer server;
113
114     @Mock
115     private NetconfMonitoringService monitoringService;
116     @Mock
117     private SessionListener serverSessionListener;
118     private TestingNetconfOperation testingNetconfOperation;
119
120     @BeforeAll
121     public static void beforeAll() throws Exception {
122         timer = new DefaultNetconfTimer();
123         clientExecutor = Executors.newFixedThreadPool(CONCURRENCY, new ThreadFactory() {
124             int index = 1;
125
126             @Override
127             public Thread newThread(final Runnable runnable) {
128                 Thread thread = new Thread(runnable);
129                 thread.setName("client-" + index++);
130                 thread.setDaemon(true);
131                 return thread;
132             }
133         });
134
135         // create temp socket to get available port for test
136         serverAddress = InetAddress.getLoopbackAddress();
137         try (var socket = new ServerSocket(0)) {
138             serverPort = socket.getLocalPort();
139         }
140
141         final var address = IetfInetUtil.ipAddressFor(serverAddress);
142         final var port = new PortNumber(Uint16.valueOf(serverPort));
143         serverParams = new TcpServerParametersBuilder().setLocalAddress(address).setLocalPort(port).build();
144         clientParams =
145             new TcpClientParametersBuilder().setRemoteAddress(new Host(address)).setRemotePort(port).build();
146
147         getConfigMessage = requireNonNull(XmlFileLoader.xmlFileToNetconfMessage("netconfMessages/getConfig.xml"));
148         clientHelloMessage = requireNonNull(XmlFileLoader.xmlFileToNetconfMessage("netconfMessages/client_hello.xml"));
149     }
150
151     @AfterAll
152     static void afterAll() {
153         clientExecutor.shutdownNow();
154         timer.close();
155     }
156
157     void startServer(final int threads, final Set<String> serverCapabilities) throws Exception {
158         testingNetconfOperation = new TestingNetconfOperation();
159         final var factoriesListener = new AggregatedNetconfOperationServiceFactory();
160         factoriesListener.onAddNetconfOperationServiceFactory(
161             new TestingOperationServiceFactory(testingNetconfOperation));
162
163         doNothing().when(serverSessionListener).onSessionUp(any(NetconfServerSession.class));
164         doNothing().when(serverSessionListener).onSessionDown(any(NetconfServerSession.class));
165         doNothing().when(serverSessionListener).onSessionEvent(any(SessionEvent.class));
166         lenient().doReturn((Registration) () -> {
167         }).when(monitoringService)
168             .registerCapabilitiesListener(any(NetconfMonitoringService.CapabilitiesListener.class));
169         doReturn(serverSessionListener).when(monitoringService).getSessionListener();
170         doReturn(EMPTY_CAPABILITIES).when(monitoringService).getCapabilities();
171
172         serverBootstrapFactory = new BootstrapFactory("server", threads);
173         server = TCPServer.listen(new ServerTransportInitializer(NetconfServerSessionNegotiatorFactory.builder()
174             .setTimer(timer)
175             .setAggregatedOpService(factoriesListener)
176             .setIdProvider(ID_PROVIDER)
177             .setConnectionTimeoutMillis(TIMEOUT)
178             .setMonitoringService(monitoringService)
179             .setBaseCapabilities(serverCapabilities)
180             .build()), serverBootstrapFactory.newServerBootstrap(), serverParams)
181             .get(TIMEOUT, MILLISECONDS);
182     }
183
184     @AfterEach
185     void afterEach() throws Exception {
186         server.shutdown().get(TIMEOUT, MILLISECONDS);
187         serverBootstrapFactory.close();
188         if (clientFactory != null) {
189             clientFactory.close();
190         }
191     }
192
193     @ParameterizedTest
194     @MethodSource
195     @Timeout(CONCURRENCY * 1000)
196     void testConcurrentClients(final int threads, final Class<? extends Runnable> clientClass,
197             final Set<String> serverCaps) throws Exception {
198
199         startServer(threads, serverCaps);
200         clientFactory = clientClass == NetconfClientRunnable.class
201             ? new NetconfClientFactoryImpl(timer, new SSHTransportStackFactory("client", threads)) : null;
202
203         final var futures = new ArrayList<Future<?>>(CONCURRENCY);
204         for (int i = 0; i < CONCURRENCY; i++) {
205             final var runnableClient = clientClass == NetconfClientRunnable.class
206                 ? new NetconfClientRunnable(clientFactory) : new BlockingClientRunnable();
207             futures.add(clientExecutor.submit(runnableClient));
208         }
209         for (var future : futures) {
210             future.get(TIMEOUT, MILLISECONDS);
211         }
212         assertEquals(CONCURRENCY, testingNetconfOperation.counter.get());
213     }
214
215     private static List<Arguments> testConcurrentClients() {
216         return List.of(
217             // (threads, runnable client class, server capabilities)
218             Arguments.of(4, NetconfClientRunnable.class, DEFAULT_BASE_CAPABILITIES),
219             Arguments.of(1, NetconfClientRunnable.class, DEFAULT_BASE_CAPABILITIES),
220             // empty set of capabilities = only base 1.0 netconf capability
221             Arguments.of(4, NetconfClientRunnable.class, Set.of()),
222             Arguments.of(4, NetconfClientRunnable.class, CAPS_EXI),
223             Arguments.of(4, NetconfClientRunnable.class, CAPS_1_1),
224             Arguments.of(4, BlockingClientRunnable.class, CAPS_EXI),
225             Arguments.of(1, BlockingClientRunnable.class, CAPS_EXI)
226         );
227     }
228
229     /**
230      * Responds to all operations except start-exi and counts all requests.
231      */
232     private static final class TestingNetconfOperation implements NetconfOperation {
233         private final AtomicLong counter = new AtomicLong();
234
235         @Override
236         public HandlingPriority canHandle(final Document message) {
237             return XmlUtil.toString(message).contains(NetconfStartExiMessageProvider.START_EXI)
238                 ? null : HandlingPriority.HANDLE_WITH_MAX_PRIORITY;
239         }
240
241         @SuppressWarnings("checkstyle:IllegalCatch")
242         @Override
243         public Document handle(final Document requestMessage,
244                 final NetconfOperationChainedExecution subsequentOperation) throws DocumentedException {
245             LOG.info("Handling netconf message from test {}", XmlUtil.toString(requestMessage));
246             counter.getAndIncrement();
247             try {
248                 return XmlUtil.readXmlToDocument("<test/>");
249             } catch (Exception e) {
250                 throw new RuntimeException(e);
251             }
252         }
253     }
254
255     /**
256      * Hardcoded operation service factory.
257      */
258     private record TestingOperationServiceFactory(NetconfOperation... operations)
259             implements NetconfOperationServiceFactory {
260
261         @Override
262         public Set<Capability> getCapabilities() {
263             return Set.of();
264         }
265
266         @Override
267         public Registration registerCapabilityListener(final CapabilityListener listener) {
268             // No-op
269             return () -> { };
270         }
271
272         @Override
273         public NetconfOperationService createService(final SessionIdType sessionId) {
274             return new NetconfOperationService() {
275
276                 @Override
277                 public Set<NetconfOperation> getNetconfOperations() {
278                     return Set.of(operations);
279                 }
280
281                 @Override
282                 public void close() {
283                 }
284             };
285         }
286     }
287
288     /**
289      * Pure socket based blocking client.
290      */
291     private record BlockingClientRunnable() implements Runnable {
292         @Override
293         public void run() {
294             try (var clientSocket = new Socket(serverAddress, serverPort)) {
295                 final var outToServer = new DataOutputStream(clientSocket.getOutputStream());
296                 final var inFromServer = new InputStreamReader(clientSocket.getInputStream());
297
298                 var sb = new StringBuilder();
299                 while (!sb.toString().endsWith("]]>]]>")) {
300                     sb.append((char) inFromServer.read());
301                 }
302                 LOG.info(sb.toString());
303
304                 outToServer.write(clientHelloMessage.toString().getBytes(StandardCharsets.UTF_8));
305                 outToServer.write("]]>]]>".getBytes());
306                 outToServer.flush();
307                 // Thread.sleep(100);
308                 outToServer.write(getConfigMessage.toString().getBytes(StandardCharsets.UTF_8));
309                 outToServer.write("]]>]]>".getBytes());
310                 outToServer.flush();
311                 // Thread.sleep(100);
312                 sb = new StringBuilder();
313                 while (!sb.toString().endsWith("]]>]]>")) {
314                     sb.append((char) inFromServer.read());
315                 }
316                 LOG.info(sb.toString());
317             } catch (IOException e) {
318                 throw new IllegalStateException(Thread.currentThread().getName(), e);
319             }
320         }
321     }
322
323     /**
324      * NetconfClientFactory based runnable.
325      */
326     private record NetconfClientRunnable(NetconfClientFactory factory) implements Runnable {
327
328         @SuppressWarnings("checkstyle:IllegalCatch")
329         @Override
330         public void run() {
331             final var sessionListener = new SimpleNetconfClientSessionListener();
332             final var clientConfig = NetconfClientConfigurationBuilder.create()
333                 .withTcpParameters(clientParams).withSessionListener(sessionListener).build();
334             try (var session = factory.createClient(clientConfig).get()) {
335                 final var sessionId = session.sessionId();
336                 LOG.info("Client with session id {}: hello exchanged", sessionId);
337                 final var result = sessionListener.sendRequest(getConfigMessage).get();
338                 LOG.info("Client with session id {}: got result {}", sessionId.getValue(), result);
339
340                 checkState(!NetconfMessageUtil.isErrorMessage(result),
341                     "Received error response: " + XmlUtil.toString(result.getDocument()) + " to request: "
342                         + XmlUtil.toString(getConfigMessage.getDocument()));
343
344                 LOG.info("Client with session id {}: ended", sessionId.getValue());
345             } catch (final Exception e) {
346                 throw new IllegalStateException(Thread.currentThread().getName(), e);
347             }
348         }
349     }
350 }