2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.server;
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
12 import static java.util.concurrent.TimeUnit.MILLISECONDS;
13 import static org.junit.jupiter.api.Assertions.assertEquals;
14 import static org.mockito.ArgumentMatchers.any;
15 import static org.mockito.Mockito.doNothing;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.lenient;
18 import static org.opendaylight.netconf.server.NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES;
20 import io.netty.util.HashedWheelTimer;
21 import java.io.DataOutputStream;
22 import java.io.IOException;
23 import java.io.InputStreamReader;
24 import java.net.InetAddress;
25 import java.net.ServerSocket;
26 import java.net.Socket;
27 import java.nio.charset.StandardCharsets;
28 import java.util.ArrayList;
29 import java.util.List;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.ThreadFactory;
35 import java.util.concurrent.atomic.AtomicLong;
36 import org.junit.jupiter.api.AfterAll;
37 import org.junit.jupiter.api.AfterEach;
38 import org.junit.jupiter.api.BeforeAll;
39 import org.junit.jupiter.api.BeforeEach;
40 import org.junit.jupiter.api.Timeout;
41 import org.junit.jupiter.api.extension.ExtendWith;
42 import org.junit.jupiter.params.ParameterizedTest;
43 import org.junit.jupiter.params.provider.Arguments;
44 import org.junit.jupiter.params.provider.MethodSource;
45 import org.mockito.Mock;
46 import org.mockito.junit.jupiter.MockitoExtension;
47 import org.opendaylight.netconf.api.CapabilityURN;
48 import org.opendaylight.netconf.api.DocumentedException;
49 import org.opendaylight.netconf.api.messages.NetconfMessage;
50 import org.opendaylight.netconf.api.xml.XmlUtil;
51 import org.opendaylight.netconf.client.NetconfClientFactory;
52 import org.opendaylight.netconf.client.NetconfClientFactoryImpl;
53 import org.opendaylight.netconf.client.NetconfMessageUtil;
54 import org.opendaylight.netconf.client.SimpleNetconfClientSessionListener;
55 import org.opendaylight.netconf.client.conf.NetconfClientConfigurationBuilder;
56 import org.opendaylight.netconf.nettyutil.handler.exi.NetconfStartExiMessageProvider;
57 import org.opendaylight.netconf.server.api.SessionIdProvider;
58 import org.opendaylight.netconf.server.api.monitoring.Capability;
59 import org.opendaylight.netconf.server.api.monitoring.CapabilityListener;
60 import org.opendaylight.netconf.server.api.monitoring.NetconfMonitoringService;
61 import org.opendaylight.netconf.server.api.monitoring.SessionEvent;
62 import org.opendaylight.netconf.server.api.monitoring.SessionListener;
63 import org.opendaylight.netconf.server.api.operations.HandlingPriority;
64 import org.opendaylight.netconf.server.api.operations.NetconfOperation;
65 import org.opendaylight.netconf.server.api.operations.NetconfOperationChainedExecution;
66 import org.opendaylight.netconf.server.api.operations.NetconfOperationService;
67 import org.opendaylight.netconf.server.api.operations.NetconfOperationServiceFactory;
68 import org.opendaylight.netconf.server.impl.DefaultSessionIdProvider;
69 import org.opendaylight.netconf.server.osgi.AggregatedNetconfOperationServiceFactory;
70 import org.opendaylight.netconf.test.util.XmlFileLoader;
71 import org.opendaylight.netconf.transport.ssh.SSHTransportStackFactory;
72 import org.opendaylight.netconf.transport.tcp.BootstrapFactory;
73 import org.opendaylight.netconf.transport.tcp.TCPServer;
74 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.SessionIdType;
75 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Host;
76 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IetfInetUtil;
77 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.PortNumber;
78 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.client.rev231228.netconf.client.initiate.stack.grouping.transport.ssh.ssh.TcpClientParametersBuilder;
79 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.Capabilities;
80 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.CapabilitiesBuilder;
81 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.server.rev231228.netconf.server.listen.stack.grouping.transport.ssh.ssh.TcpServerParametersBuilder;
82 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.tcp.client.rev231228.TcpClientGrouping;
83 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.tcp.server.rev231228.TcpServerGrouping;
84 import org.opendaylight.yangtools.concepts.Registration;
85 import org.opendaylight.yangtools.yang.common.Uint16;
86 import org.slf4j.Logger;
87 import org.slf4j.LoggerFactory;
88 import org.w3c.dom.Document;
90 @ExtendWith(MockitoExtension.class)
91 public class ConcurrentClientsTest {
92 private static final Logger LOG = LoggerFactory.getLogger(ConcurrentClientsTest.class);
94 private static final int CONCURRENCY = 32;
95 private static final long TIMEOUT = 5000L;
96 private static final Set<String> CAPS_EXI = Set.of(CapabilityURN.BASE, CapabilityURN.EXI);
97 private static final Set<String> CAPS_1_1 = Set.of(CapabilityURN.BASE, CapabilityURN.BASE_1_1);
98 private static final Capabilities EMPTY_CAPABILITIES = new CapabilitiesBuilder().setCapability(Set.of()).build();
99 private static final SessionIdProvider ID_PROVIDER = new DefaultSessionIdProvider();
101 private static ExecutorService clientExecutor;
102 private static InetAddress serverAddress;
103 private static int serverPort;
104 private static TcpServerGrouping serverParams;
105 private static TcpClientGrouping clientParams;
107 private static NetconfMessage getConfigMessage;
108 private static NetconfMessage clientHelloMessage;
110 private HashedWheelTimer hashedWheelTimer;
111 private BootstrapFactory serverBootstrapFactory;
112 private NetconfClientFactory clientFactory;
113 private TCPServer server;
116 private NetconfMonitoringService monitoringService;
118 private SessionListener serverSessionListener;
119 private TestingNetconfOperation testingNetconfOperation;
122 public static void beforeAll() throws Exception {
123 clientExecutor = Executors.newFixedThreadPool(CONCURRENCY, new ThreadFactory() {
127 public Thread newThread(final Runnable runnable) {
128 Thread thread = new Thread(runnable);
129 thread.setName("client-" + index++);
130 thread.setDaemon(true);
135 // create temp socket to get available port for test
136 serverAddress = InetAddress.getLoopbackAddress();
137 try (var socket = new ServerSocket(0)) {
138 serverPort = socket.getLocalPort();
141 final var address = IetfInetUtil.ipAddressFor(serverAddress);
142 final var port = new PortNumber(Uint16.valueOf(serverPort));
143 serverParams = new TcpServerParametersBuilder().setLocalAddress(address).setLocalPort(port).build();
145 new TcpClientParametersBuilder().setRemoteAddress(new Host(address)).setRemotePort(port).build();
147 getConfigMessage = requireNonNull(XmlFileLoader.xmlFileToNetconfMessage("netconfMessages/getConfig.xml"));
148 clientHelloMessage = requireNonNull(XmlFileLoader.xmlFileToNetconfMessage("netconfMessages/client_hello.xml"));
152 static void afterAll() {
153 clientExecutor.shutdownNow();
158 hashedWheelTimer = new HashedWheelTimer();
161 void startServer(final int threads, final Set<String> serverCapabilities) throws Exception {
162 testingNetconfOperation = new TestingNetconfOperation();
163 final var factoriesListener = new AggregatedNetconfOperationServiceFactory();
164 factoriesListener.onAddNetconfOperationServiceFactory(
165 new TestingOperationServiceFactory(testingNetconfOperation));
167 doNothing().when(serverSessionListener).onSessionUp(any(NetconfServerSession.class));
168 doNothing().when(serverSessionListener).onSessionDown(any(NetconfServerSession.class));
169 doNothing().when(serverSessionListener).onSessionEvent(any(SessionEvent.class));
170 lenient().doReturn((Registration) () -> {
171 }).when(monitoringService)
172 .registerCapabilitiesListener(any(NetconfMonitoringService.CapabilitiesListener.class));
173 doReturn(serverSessionListener).when(monitoringService).getSessionListener();
174 doReturn(EMPTY_CAPABILITIES).when(monitoringService).getCapabilities();
176 serverBootstrapFactory = new BootstrapFactory("server", threads);
177 server = TCPServer.listen(new ServerTransportInitializer(NetconfServerSessionNegotiatorFactory.builder()
178 .setTimer(hashedWheelTimer)
179 .setAggregatedOpService(factoriesListener)
180 .setIdProvider(ID_PROVIDER)
181 .setConnectionTimeoutMillis(TIMEOUT)
182 .setMonitoringService(monitoringService)
183 .setBaseCapabilities(serverCapabilities)
184 .build()), serverBootstrapFactory.newServerBootstrap(), serverParams)
185 .get(TIMEOUT, MILLISECONDS);
189 void afterEach() throws Exception {
190 hashedWheelTimer.stop();
191 server.shutdown().get(TIMEOUT, MILLISECONDS);
192 serverBootstrapFactory.close();
193 if (clientFactory != null) {
194 clientFactory.close();
200 @Timeout(CONCURRENCY * 1000)
201 void testConcurrentClients(final int threads, final Class<? extends Runnable> clientClass,
202 final Set<String> serverCaps) throws Exception {
204 startServer(threads, serverCaps);
205 clientFactory = clientClass == NetconfClientRunnable.class
206 ? new NetconfClientFactoryImpl(new SSHTransportStackFactory("client", threads)) : null;
208 final var futures = new ArrayList<Future<?>>(CONCURRENCY);
209 for (int i = 0; i < CONCURRENCY; i++) {
210 final var runnableClient = clientClass == NetconfClientRunnable.class
211 ? new NetconfClientRunnable(clientFactory) : new BlockingClientRunnable();
212 futures.add(clientExecutor.submit(runnableClient));
214 for (var future : futures) {
215 future.get(TIMEOUT, MILLISECONDS);
217 assertEquals(CONCURRENCY, testingNetconfOperation.counter.get());
220 private static List<Arguments> testConcurrentClients() {
222 // (threads, runnable client class, server capabilities)
223 Arguments.of(4, NetconfClientRunnable.class, DEFAULT_BASE_CAPABILITIES),
224 Arguments.of(1, NetconfClientRunnable.class, DEFAULT_BASE_CAPABILITIES),
225 // empty set of capabilities = only base 1.0 netconf capability
226 Arguments.of(4, NetconfClientRunnable.class, Set.of()),
227 Arguments.of(4, NetconfClientRunnable.class, CAPS_EXI),
228 Arguments.of(4, NetconfClientRunnable.class, CAPS_1_1),
229 Arguments.of(4, BlockingClientRunnable.class, CAPS_EXI),
230 Arguments.of(1, BlockingClientRunnable.class, CAPS_EXI)
235 * Responds to all operations except start-exi and counts all requests.
237 private static final class TestingNetconfOperation implements NetconfOperation {
238 private final AtomicLong counter = new AtomicLong();
241 public HandlingPriority canHandle(final Document message) {
242 return XmlUtil.toString(message).contains(NetconfStartExiMessageProvider.START_EXI)
243 ? null : HandlingPriority.HANDLE_WITH_MAX_PRIORITY;
246 @SuppressWarnings("checkstyle:IllegalCatch")
248 public Document handle(final Document requestMessage,
249 final NetconfOperationChainedExecution subsequentOperation) throws DocumentedException {
250 LOG.info("Handling netconf message from test {}", XmlUtil.toString(requestMessage));
251 counter.getAndIncrement();
253 return XmlUtil.readXmlToDocument("<test/>");
254 } catch (Exception e) {
255 throw new RuntimeException(e);
261 * Hardcoded operation service factory.
263 private record TestingOperationServiceFactory(NetconfOperation... operations)
264 implements NetconfOperationServiceFactory {
267 public Set<Capability> getCapabilities() {
272 public Registration registerCapabilityListener(final CapabilityListener listener) {
278 public NetconfOperationService createService(final SessionIdType sessionId) {
279 return new NetconfOperationService() {
282 public Set<NetconfOperation> getNetconfOperations() {
283 return Set.of(operations);
287 public void close() {
294 * Pure socket based blocking client.
296 private record BlockingClientRunnable() implements Runnable {
299 try (var clientSocket = new Socket(serverAddress, serverPort)) {
300 final var outToServer = new DataOutputStream(clientSocket.getOutputStream());
301 final var inFromServer = new InputStreamReader(clientSocket.getInputStream());
303 var sb = new StringBuilder();
304 while (!sb.toString().endsWith("]]>]]>")) {
305 sb.append((char) inFromServer.read());
307 LOG.info(sb.toString());
309 outToServer.write(clientHelloMessage.toString().getBytes(StandardCharsets.UTF_8));
310 outToServer.write("]]>]]>".getBytes());
312 // Thread.sleep(100);
313 outToServer.write(getConfigMessage.toString().getBytes(StandardCharsets.UTF_8));
314 outToServer.write("]]>]]>".getBytes());
316 // Thread.sleep(100);
317 sb = new StringBuilder();
318 while (!sb.toString().endsWith("]]>]]>")) {
319 sb.append((char) inFromServer.read());
321 LOG.info(sb.toString());
322 } catch (IOException e) {
323 throw new IllegalStateException(Thread.currentThread().getName(), e);
329 * NetconfClientFactory based runnable.
331 private record NetconfClientRunnable(NetconfClientFactory factory) implements Runnable {
333 @SuppressWarnings("checkstyle:IllegalCatch")
336 final var sessionListener = new SimpleNetconfClientSessionListener();
337 final var clientConfig = NetconfClientConfigurationBuilder.create()
338 .withTcpParameters(clientParams).withSessionListener(sessionListener).build();
339 try (var session = factory.createClient(clientConfig).get()) {
340 final var sessionId = session.sessionId();
341 LOG.info("Client with session id {}: hello exchanged", sessionId);
342 final var result = sessionListener.sendRequest(getConfigMessage).get();
343 LOG.info("Client with session id {}: got result {}", sessionId.getValue(), result);
345 checkState(!NetconfMessageUtil.isErrorMessage(result),
346 "Received error response: " + XmlUtil.toString(result.getDocument()) + " to request: "
347 + XmlUtil.toString(getConfigMessage.getDocument()));
349 LOG.info("Client with session id {}: ended", sessionId.getValue());
350 } catch (final Exception e) {
351 throw new IllegalStateException(Thread.currentThread().getName(), e);