2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.server;
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
12 import static java.util.concurrent.TimeUnit.MILLISECONDS;
13 import static org.junit.jupiter.api.Assertions.assertEquals;
14 import static org.mockito.ArgumentMatchers.any;
15 import static org.mockito.Mockito.doNothing;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.lenient;
18 import static org.opendaylight.netconf.server.NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES;
20 import java.io.DataOutputStream;
21 import java.io.IOException;
22 import java.io.InputStreamReader;
23 import java.net.InetAddress;
24 import java.net.ServerSocket;
25 import java.net.Socket;
26 import java.nio.charset.StandardCharsets;
27 import java.util.ArrayList;
28 import java.util.List;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.ThreadFactory;
34 import java.util.concurrent.atomic.AtomicLong;
35 import org.junit.jupiter.api.AfterAll;
36 import org.junit.jupiter.api.AfterEach;
37 import org.junit.jupiter.api.BeforeAll;
38 import org.junit.jupiter.api.Timeout;
39 import org.junit.jupiter.api.extension.ExtendWith;
40 import org.junit.jupiter.params.ParameterizedTest;
41 import org.junit.jupiter.params.provider.Arguments;
42 import org.junit.jupiter.params.provider.MethodSource;
43 import org.mockito.Mock;
44 import org.mockito.junit.jupiter.MockitoExtension;
45 import org.opendaylight.netconf.api.CapabilityURN;
46 import org.opendaylight.netconf.api.DocumentedException;
47 import org.opendaylight.netconf.api.messages.NetconfMessage;
48 import org.opendaylight.netconf.api.xml.XmlUtil;
49 import org.opendaylight.netconf.client.NetconfClientFactory;
50 import org.opendaylight.netconf.client.NetconfClientFactoryImpl;
51 import org.opendaylight.netconf.client.NetconfMessageUtil;
52 import org.opendaylight.netconf.client.SimpleNetconfClientSessionListener;
53 import org.opendaylight.netconf.client.conf.NetconfClientConfigurationBuilder;
54 import org.opendaylight.netconf.common.impl.DefaultNetconfTimer;
55 import org.opendaylight.netconf.nettyutil.handler.exi.NetconfStartExiMessageProvider;
56 import org.opendaylight.netconf.server.api.SessionIdProvider;
57 import org.opendaylight.netconf.server.api.monitoring.Capability;
58 import org.opendaylight.netconf.server.api.monitoring.CapabilityListener;
59 import org.opendaylight.netconf.server.api.monitoring.NetconfMonitoringService;
60 import org.opendaylight.netconf.server.api.monitoring.SessionEvent;
61 import org.opendaylight.netconf.server.api.monitoring.SessionListener;
62 import org.opendaylight.netconf.server.api.operations.HandlingPriority;
63 import org.opendaylight.netconf.server.api.operations.NetconfOperation;
64 import org.opendaylight.netconf.server.api.operations.NetconfOperationChainedExecution;
65 import org.opendaylight.netconf.server.api.operations.NetconfOperationService;
66 import org.opendaylight.netconf.server.api.operations.NetconfOperationServiceFactory;
67 import org.opendaylight.netconf.server.impl.DefaultSessionIdProvider;
68 import org.opendaylight.netconf.server.osgi.AggregatedNetconfOperationServiceFactory;
69 import org.opendaylight.netconf.test.util.XmlFileLoader;
70 import org.opendaylight.netconf.transport.ssh.SSHTransportStackFactory;
71 import org.opendaylight.netconf.transport.tcp.BootstrapFactory;
72 import org.opendaylight.netconf.transport.tcp.TCPServer;
73 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.SessionIdType;
74 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Host;
75 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IetfInetUtil;
76 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.PortNumber;
77 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.client.rev231228.netconf.client.initiate.stack.grouping.transport.ssh.ssh.TcpClientParametersBuilder;
78 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.Capabilities;
79 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.CapabilitiesBuilder;
80 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.server.rev231228.netconf.server.listen.stack.grouping.transport.ssh.ssh.TcpServerParametersBuilder;
81 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.tcp.client.rev231228.TcpClientGrouping;
82 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.tcp.server.rev231228.TcpServerGrouping;
83 import org.opendaylight.yangtools.concepts.Registration;
84 import org.opendaylight.yangtools.yang.common.Uint16;
85 import org.slf4j.Logger;
86 import org.slf4j.LoggerFactory;
87 import org.w3c.dom.Document;
89 @ExtendWith(MockitoExtension.class)
90 public class ConcurrentClientsTest {
91 private static final Logger LOG = LoggerFactory.getLogger(ConcurrentClientsTest.class);
93 private static final int CONCURRENCY = 32;
94 private static final long TIMEOUT = 5000L;
95 private static final Set<String> CAPS_EXI = Set.of(CapabilityURN.BASE, CapabilityURN.EXI);
96 private static final Set<String> CAPS_1_1 = Set.of(CapabilityURN.BASE, CapabilityURN.BASE_1_1);
97 private static final Capabilities EMPTY_CAPABILITIES = new CapabilitiesBuilder().setCapability(Set.of()).build();
98 private static final SessionIdProvider ID_PROVIDER = new DefaultSessionIdProvider();
100 private static ExecutorService clientExecutor;
101 private static InetAddress serverAddress;
102 private static int serverPort;
103 private static TcpServerGrouping serverParams;
104 private static TcpClientGrouping clientParams;
105 private static DefaultNetconfTimer timer;
107 private static NetconfMessage getConfigMessage;
108 private static NetconfMessage clientHelloMessage;
110 private BootstrapFactory serverBootstrapFactory;
111 private NetconfClientFactory clientFactory;
112 private TCPServer server;
115 private NetconfMonitoringService monitoringService;
117 private SessionListener serverSessionListener;
118 private TestingNetconfOperation testingNetconfOperation;
121 public static void beforeAll() throws Exception {
122 timer = new DefaultNetconfTimer();
123 clientExecutor = Executors.newFixedThreadPool(CONCURRENCY, new ThreadFactory() {
127 public Thread newThread(final Runnable runnable) {
128 Thread thread = new Thread(runnable);
129 thread.setName("client-" + index++);
130 thread.setDaemon(true);
135 // create temp socket to get available port for test
136 serverAddress = InetAddress.getLoopbackAddress();
137 try (var socket = new ServerSocket(0)) {
138 serverPort = socket.getLocalPort();
141 final var address = IetfInetUtil.ipAddressFor(serverAddress);
142 final var port = new PortNumber(Uint16.valueOf(serverPort));
143 serverParams = new TcpServerParametersBuilder().setLocalAddress(address).setLocalPort(port).build();
145 new TcpClientParametersBuilder().setRemoteAddress(new Host(address)).setRemotePort(port).build();
147 getConfigMessage = requireNonNull(XmlFileLoader.xmlFileToNetconfMessage("netconfMessages/getConfig.xml"));
148 clientHelloMessage = requireNonNull(XmlFileLoader.xmlFileToNetconfMessage("netconfMessages/client_hello.xml"));
152 static void afterAll() {
153 clientExecutor.shutdownNow();
157 void startServer(final int threads, final Set<String> serverCapabilities) throws Exception {
158 testingNetconfOperation = new TestingNetconfOperation();
159 final var factoriesListener = new AggregatedNetconfOperationServiceFactory();
160 factoriesListener.onAddNetconfOperationServiceFactory(
161 new TestingOperationServiceFactory(testingNetconfOperation));
163 doNothing().when(serverSessionListener).onSessionUp(any(NetconfServerSession.class));
164 doNothing().when(serverSessionListener).onSessionDown(any(NetconfServerSession.class));
165 doNothing().when(serverSessionListener).onSessionEvent(any(SessionEvent.class));
166 lenient().doReturn((Registration) () -> {
167 }).when(monitoringService)
168 .registerCapabilitiesListener(any(NetconfMonitoringService.CapabilitiesListener.class));
169 doReturn(serverSessionListener).when(monitoringService).getSessionListener();
170 doReturn(EMPTY_CAPABILITIES).when(monitoringService).getCapabilities();
172 serverBootstrapFactory = new BootstrapFactory("server", threads);
173 server = TCPServer.listen(new ServerTransportInitializer(NetconfServerSessionNegotiatorFactory.builder()
175 .setAggregatedOpService(factoriesListener)
176 .setIdProvider(ID_PROVIDER)
177 .setConnectionTimeoutMillis(TIMEOUT)
178 .setMonitoringService(monitoringService)
179 .setBaseCapabilities(serverCapabilities)
180 .build()), serverBootstrapFactory.newServerBootstrap(), serverParams)
181 .get(TIMEOUT, MILLISECONDS);
185 void afterEach() throws Exception {
186 server.shutdown().get(TIMEOUT, MILLISECONDS);
187 serverBootstrapFactory.close();
188 if (clientFactory != null) {
189 clientFactory.close();
195 @Timeout(CONCURRENCY * 1000)
196 void testConcurrentClients(final int threads, final Class<? extends Runnable> clientClass,
197 final Set<String> serverCaps) throws Exception {
199 startServer(threads, serverCaps);
200 clientFactory = clientClass == NetconfClientRunnable.class
201 ? new NetconfClientFactoryImpl(timer, new SSHTransportStackFactory("client", threads)) : null;
203 final var futures = new ArrayList<Future<?>>(CONCURRENCY);
204 for (int i = 0; i < CONCURRENCY; i++) {
205 final var runnableClient = clientClass == NetconfClientRunnable.class
206 ? new NetconfClientRunnable(clientFactory) : new BlockingClientRunnable();
207 futures.add(clientExecutor.submit(runnableClient));
209 for (var future : futures) {
210 future.get(TIMEOUT, MILLISECONDS);
212 assertEquals(CONCURRENCY, testingNetconfOperation.counter.get());
215 private static List<Arguments> testConcurrentClients() {
217 // (threads, runnable client class, server capabilities)
218 Arguments.of(4, NetconfClientRunnable.class, DEFAULT_BASE_CAPABILITIES),
219 Arguments.of(1, NetconfClientRunnable.class, DEFAULT_BASE_CAPABILITIES),
220 // empty set of capabilities = only base 1.0 netconf capability
221 Arguments.of(4, NetconfClientRunnable.class, Set.of()),
222 Arguments.of(4, NetconfClientRunnable.class, CAPS_EXI),
223 Arguments.of(4, NetconfClientRunnable.class, CAPS_1_1),
224 Arguments.of(4, BlockingClientRunnable.class, CAPS_EXI),
225 Arguments.of(1, BlockingClientRunnable.class, CAPS_EXI)
230 * Responds to all operations except start-exi and counts all requests.
232 private static final class TestingNetconfOperation implements NetconfOperation {
233 private final AtomicLong counter = new AtomicLong();
236 public HandlingPriority canHandle(final Document message) {
237 return XmlUtil.toString(message).contains(NetconfStartExiMessageProvider.START_EXI)
238 ? null : HandlingPriority.HANDLE_WITH_MAX_PRIORITY;
241 @SuppressWarnings("checkstyle:IllegalCatch")
243 public Document handle(final Document requestMessage,
244 final NetconfOperationChainedExecution subsequentOperation) throws DocumentedException {
245 LOG.info("Handling netconf message from test {}", XmlUtil.toString(requestMessage));
246 counter.getAndIncrement();
248 return XmlUtil.readXmlToDocument("<test/>");
249 } catch (Exception e) {
250 throw new RuntimeException(e);
256 * Hardcoded operation service factory.
258 private record TestingOperationServiceFactory(NetconfOperation... operations)
259 implements NetconfOperationServiceFactory {
262 public Set<Capability> getCapabilities() {
267 public Registration registerCapabilityListener(final CapabilityListener listener) {
273 public NetconfOperationService createService(final SessionIdType sessionId) {
274 return new NetconfOperationService() {
277 public Set<NetconfOperation> getNetconfOperations() {
278 return Set.of(operations);
282 public void close() {
289 * Pure socket based blocking client.
291 private record BlockingClientRunnable() implements Runnable {
294 try (var clientSocket = new Socket(serverAddress, serverPort)) {
295 final var outToServer = new DataOutputStream(clientSocket.getOutputStream());
296 final var inFromServer = new InputStreamReader(clientSocket.getInputStream());
298 var sb = new StringBuilder();
299 while (!sb.toString().endsWith("]]>]]>")) {
300 sb.append((char) inFromServer.read());
302 LOG.info(sb.toString());
304 outToServer.write(clientHelloMessage.toString().getBytes(StandardCharsets.UTF_8));
305 outToServer.write("]]>]]>".getBytes());
307 // Thread.sleep(100);
308 outToServer.write(getConfigMessage.toString().getBytes(StandardCharsets.UTF_8));
309 outToServer.write("]]>]]>".getBytes());
311 // Thread.sleep(100);
312 sb = new StringBuilder();
313 while (!sb.toString().endsWith("]]>]]>")) {
314 sb.append((char) inFromServer.read());
316 LOG.info(sb.toString());
317 } catch (IOException e) {
318 throw new IllegalStateException(Thread.currentThread().getName(), e);
324 * NetconfClientFactory based runnable.
326 private record NetconfClientRunnable(NetconfClientFactory factory) implements Runnable {
328 @SuppressWarnings("checkstyle:IllegalCatch")
331 final var sessionListener = new SimpleNetconfClientSessionListener();
332 final var clientConfig = NetconfClientConfigurationBuilder.create()
333 .withTcpParameters(clientParams).withSessionListener(sessionListener).build();
334 try (var session = factory.createClient(clientConfig).get()) {
335 final var sessionId = session.sessionId();
336 LOG.info("Client with session id {}: hello exchanged", sessionId);
337 final var result = sessionListener.sendRequest(getConfigMessage).get();
338 LOG.info("Client with session id {}: got result {}", sessionId.getValue(), result);
340 checkState(!NetconfMessageUtil.isErrorMessage(result),
341 "Received error response: " + XmlUtil.toString(result.getDocument()) + " to request: "
342 + XmlUtil.toString(getConfigMessage.getDocument()));
344 LOG.info("Client with session id {}: ended", sessionId.getValue());
345 } catch (final Exception e) {
346 throw new IllegalStateException(Thread.currentThread().getName(), e);