2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.server;
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
12 import static org.junit.Assert.assertEquals;
13 import static org.mockito.ArgumentMatchers.any;
14 import static org.mockito.Mockito.doNothing;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.mock;
18 import com.google.common.io.ByteStreams;
19 import io.netty.channel.ChannelFuture;
20 import io.netty.channel.EventLoopGroup;
21 import io.netty.channel.nio.NioEventLoopGroup;
22 import io.netty.util.HashedWheelTimer;
23 import java.io.DataOutputStream;
24 import java.io.InputStream;
25 import java.io.InputStreamReader;
26 import java.net.InetSocketAddress;
27 import java.net.Socket;
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.List;
32 import java.util.concurrent.ExecutionException;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.Future;
36 import java.util.concurrent.ThreadFactory;
37 import java.util.concurrent.atomic.AtomicLong;
38 import org.junit.After;
39 import org.junit.AfterClass;
40 import org.junit.Before;
41 import org.junit.BeforeClass;
42 import org.junit.Test;
43 import org.junit.runner.RunWith;
44 import org.junit.runners.Parameterized;
45 import org.opendaylight.netconf.api.CapabilityURN;
46 import org.opendaylight.netconf.api.DocumentedException;
47 import org.opendaylight.netconf.api.messages.NetconfHelloMessageAdditionalHeader;
48 import org.opendaylight.netconf.api.messages.NetconfMessage;
49 import org.opendaylight.netconf.api.xml.XmlUtil;
50 import org.opendaylight.netconf.client.NetconfClientDispatcher;
51 import org.opendaylight.netconf.client.NetconfClientDispatcherImpl;
52 import org.opendaylight.netconf.client.NetconfMessageUtil;
53 import org.opendaylight.netconf.client.SimpleNetconfClientSessionListener;
54 import org.opendaylight.netconf.client.TestingNetconfClient;
55 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
56 import org.opendaylight.netconf.client.conf.NetconfClientConfigurationBuilder;
57 import org.opendaylight.netconf.nettyutil.handler.exi.NetconfStartExiMessageProvider;
58 import org.opendaylight.netconf.server.api.SessionIdProvider;
59 import org.opendaylight.netconf.server.api.monitoring.Capability;
60 import org.opendaylight.netconf.server.api.monitoring.CapabilityListener;
61 import org.opendaylight.netconf.server.api.monitoring.NetconfMonitoringService;
62 import org.opendaylight.netconf.server.api.monitoring.SessionEvent;
63 import org.opendaylight.netconf.server.api.monitoring.SessionListener;
64 import org.opendaylight.netconf.server.api.operations.HandlingPriority;
65 import org.opendaylight.netconf.server.api.operations.NetconfOperation;
66 import org.opendaylight.netconf.server.api.operations.NetconfOperationChainedExecution;
67 import org.opendaylight.netconf.server.api.operations.NetconfOperationService;
68 import org.opendaylight.netconf.server.api.operations.NetconfOperationServiceFactory;
69 import org.opendaylight.netconf.server.impl.DefaultSessionIdProvider;
70 import org.opendaylight.netconf.server.osgi.AggregatedNetconfOperationServiceFactory;
71 import org.opendaylight.netconf.test.util.XmlFileLoader;
72 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.SessionIdType;
73 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.CapabilitiesBuilder;
74 import org.opendaylight.yangtools.concepts.Registration;
75 import org.slf4j.Logger;
76 import org.slf4j.LoggerFactory;
77 import org.w3c.dom.Document;
79 @RunWith(Parameterized.class)
80 public class ConcurrentClientsTest {
81 private static final Logger LOG = LoggerFactory.getLogger(ConcurrentClientsTest.class);
83 private static ExecutorService clientExecutor;
85 private static final int CONCURRENCY = 32;
86 private static final InetSocketAddress NETCONF_ADDRESS = new InetSocketAddress("127.0.0.1", 8303);
88 private final int nettyThreads;
89 private final Class<? extends Runnable> clientRunnable;
90 private final Set<String> serverCaps;
92 public ConcurrentClientsTest(final int nettyThreads, final Class<? extends Runnable> clientRunnable,
93 final Set<String> serverCaps) {
94 this.nettyThreads = nettyThreads;
95 this.clientRunnable = clientRunnable;
96 this.serverCaps = serverCaps;
99 @Parameterized.Parameters()
100 public static Collection<Object[]> data() {
101 return List.of(new Object[][]{
102 { 4, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES},
103 { 1, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES},
104 // empty set of capabilities = only base 1.0 netconf capability
105 { 4, TestingNetconfClientRunnable.class, Set.of()},
106 { 4, TestingNetconfClientRunnable.class, getOnlyExiServerCaps()},
107 { 4, TestingNetconfClientRunnable.class, getOnlyChunkServerCaps()},
108 { 4, BlockingClientRunnable.class, getOnlyExiServerCaps()},
109 { 1, BlockingClientRunnable.class, getOnlyExiServerCaps()},
113 private EventLoopGroup nettyGroup;
114 private NetconfClientDispatcher netconfClientDispatcher;
116 HashedWheelTimer hashedWheelTimer;
117 private TestingNetconfOperation testingNetconfOperation;
119 public static NetconfMonitoringService createMockedMonitoringService() {
120 NetconfMonitoringService monitoring = mock(NetconfMonitoringService.class);
121 final SessionListener sessionListener = mock(SessionListener.class);
122 doNothing().when(sessionListener).onSessionUp(any(NetconfServerSession.class));
123 doNothing().when(sessionListener).onSessionDown(any(NetconfServerSession.class));
124 doNothing().when(sessionListener).onSessionEvent(any(SessionEvent.class));
125 doReturn((Registration) () -> { }).when(monitoring)
126 .registerCapabilitiesListener(any(NetconfMonitoringService.CapabilitiesListener.class));
127 doReturn(sessionListener).when(monitoring).getSessionListener();
128 doReturn(new CapabilitiesBuilder().setCapability(Set.of()).build()).when(monitoring).getCapabilities();
133 public static void setUpClientExecutor() {
134 clientExecutor = Executors.newFixedThreadPool(CONCURRENCY, new ThreadFactory() {
138 public Thread newThread(final Runnable runnable) {
139 Thread thread = new Thread(runnable);
140 thread.setName("client-" + index++);
141 thread.setDaemon(true);
148 public void setUp() throws Exception {
149 hashedWheelTimer = new HashedWheelTimer();
150 nettyGroup = new NioEventLoopGroup(nettyThreads);
151 netconfClientDispatcher = new NetconfClientDispatcherImpl(nettyGroup, nettyGroup, hashedWheelTimer);
153 AggregatedNetconfOperationServiceFactory factoriesListener = new AggregatedNetconfOperationServiceFactory();
155 testingNetconfOperation = new TestingNetconfOperation();
156 factoriesListener.onAddNetconfOperationServiceFactory(
157 new TestingOperationServiceFactory(testingNetconfOperation));
159 SessionIdProvider idProvider = new DefaultSessionIdProvider();
161 NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new
162 NetconfServerSessionNegotiatorFactoryBuilder()
163 .setTimer(hashedWheelTimer)
164 .setAggregatedOpService(factoriesListener)
165 .setIdProvider(idProvider)
166 .setConnectionTimeoutMillis(5000)
167 .setMonitoringService(createMockedMonitoringService())
168 .setBaseCapabilities(serverCaps)
171 ServerChannelInitializer serverChannelInitializer =
172 new ServerChannelInitializer(serverNegotiatorFactory);
173 final NetconfServerDispatcherImpl dispatch =
174 new NetconfServerDispatcherImpl(serverChannelInitializer, nettyGroup, nettyGroup);
176 ChannelFuture server = dispatch.createServer(NETCONF_ADDRESS);
181 public void tearDown() {
182 hashedWheelTimer.stop();
184 nettyGroup.shutdownGracefully().get();
185 } catch (InterruptedException | ExecutionException e) {
186 LOG.warn("Ignoring exception while cleaning up after test", e);
191 public static void tearDownClientExecutor() {
192 clientExecutor.shutdownNow();
195 @Test(timeout = CONCURRENCY * 1000)
196 public void testConcurrentClients() throws Exception {
198 List<Future<?>> futures = new ArrayList<>(CONCURRENCY);
200 for (int i = 0; i < CONCURRENCY; i++) {
201 futures.add(clientExecutor.submit(getInstanceOfClientRunnable()));
204 for (Future<?> future : futures) {
207 } catch (InterruptedException e) {
208 throw new IllegalStateException(e);
209 } catch (ExecutionException e) {
210 LOG.error("Thread for testing client failed", e);
215 assertEquals(CONCURRENCY, testingNetconfOperation.counter.get());
218 public static Set<String> getOnlyExiServerCaps() {
219 return Set.of(CapabilityURN.BASE, CapabilityURN.EXI);
222 public static Set<String> getOnlyChunkServerCaps() {
223 return Set.of(CapabilityURN.BASE, CapabilityURN.BASE_1_1);
226 public Runnable getInstanceOfClientRunnable() throws Exception {
227 return clientRunnable.getConstructor(ConcurrentClientsTest.class).newInstance(this);
231 * Responds to all operations except start-exi and counts all requests.
233 private static final class TestingNetconfOperation implements NetconfOperation {
234 private final AtomicLong counter = new AtomicLong();
237 public HandlingPriority canHandle(final Document message) {
238 return XmlUtil.toString(message).contains(NetconfStartExiMessageProvider.START_EXI)
239 ? null : HandlingPriority.HANDLE_WITH_MAX_PRIORITY;
242 @SuppressWarnings("checkstyle:IllegalCatch")
244 public Document handle(final Document requestMessage,
245 final NetconfOperationChainedExecution subsequentOperation) throws DocumentedException {
246 LOG.info("Handling netconf message from test {}", XmlUtil.toString(requestMessage));
247 counter.getAndIncrement();
249 return XmlUtil.readXmlToDocument("<test/>");
250 } catch (Exception e) {
251 throw new RuntimeException(e);
257 * Hardcoded operation service factory.
259 private static final class TestingOperationServiceFactory implements NetconfOperationServiceFactory {
260 private final NetconfOperation[] operations;
262 TestingOperationServiceFactory(final NetconfOperation... operations) {
263 this.operations = operations;
267 public Set<Capability> getCapabilities() {
272 public Registration registerCapabilityListener(final CapabilityListener listener) {
278 public NetconfOperationService createService(final SessionIdType sessionId) {
279 return new NetconfOperationService() {
282 public Set<NetconfOperation> getNetconfOperations() {
283 return Set.of(operations);
287 public void close() {
294 * Pure socket based blocking client.
296 public final class BlockingClientRunnable implements Runnable {
299 @SuppressWarnings("checkstyle:IllegalCatch")
303 } catch (Exception e) {
304 throw new IllegalStateException(Thread.currentThread().getName(), e);
308 private void run2() throws Exception {
309 InputStream clientHello = requireNonNull(XmlFileLoader.getResourceAsStream(
310 "netconfMessages/client_hello.xml"));
311 final InputStream getConfig = requireNonNull(XmlFileLoader.getResourceAsStream(
312 "netconfMessages/getConfig.xml"));
314 Socket clientSocket = new Socket(NETCONF_ADDRESS.getHostString(), NETCONF_ADDRESS.getPort());
315 DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
316 InputStreamReader inFromServer = new InputStreamReader(clientSocket.getInputStream());
318 StringBuilder sb = new StringBuilder();
319 while (!sb.toString().endsWith("]]>]]>")) {
320 sb.append((char) inFromServer.read());
322 LOG.info(sb.toString());
324 outToServer.write(ByteStreams.toByteArray(clientHello));
325 outToServer.write("]]>]]>".getBytes());
327 // Thread.sleep(100);
328 outToServer.write(ByteStreams.toByteArray(getConfig));
329 outToServer.write("]]>]]>".getBytes());
332 sb = new StringBuilder();
333 while (!sb.toString().endsWith("]]>]]>")) {
334 sb.append((char) inFromServer.read());
336 LOG.info(sb.toString());
337 clientSocket.close();
342 * TestingNetconfClient based runnable.
344 public final class TestingNetconfClientRunnable implements Runnable {
346 @SuppressWarnings("checkstyle:IllegalCatch")
350 final TestingNetconfClient netconfClient =
351 new TestingNetconfClient(Thread.currentThread().getName(), netconfClientDispatcher,
353 final var sessionId = netconfClient.sessionId();
354 LOG.info("Client with session id {}: hello exchanged", sessionId);
356 final NetconfMessage getMessage = XmlFileLoader
357 .xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
358 NetconfMessage result = netconfClient.sendRequest(getMessage).get();
359 LOG.info("Client with session id {}: got result {}", sessionId.getValue(), result);
361 checkState(NetconfMessageUtil.isErrorMessage(result) == false,
362 "Received error response: " + XmlUtil.toString(result.getDocument()) + " to request: "
363 + XmlUtil.toString(getMessage.getDocument()));
365 netconfClient.close();
366 LOG.info("Client with session id {}: ended", sessionId.getValue());
367 } catch (final Exception e) {
368 throw new IllegalStateException(Thread.currentThread().getName(), e);
372 private NetconfClientConfiguration getClientConfig() {
373 return NetconfClientConfigurationBuilder.create()
374 .withAddress(NETCONF_ADDRESS)
375 .withAdditionalHeader(
376 new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp", "client"))
377 .withSessionListener(new SimpleNetconfClientSessionListener())