2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.server;
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
12 import static org.junit.Assert.assertEquals;
13 import static org.mockito.ArgumentMatchers.any;
14 import static org.mockito.Mockito.doNothing;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.mock;
18 import com.google.common.collect.Sets;
19 import com.google.common.io.ByteStreams;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.EventLoopGroup;
22 import io.netty.channel.nio.NioEventLoopGroup;
23 import io.netty.util.HashedWheelTimer;
24 import io.netty.util.concurrent.GlobalEventExecutor;
25 import java.io.DataOutputStream;
26 import java.io.InputStream;
27 import java.io.InputStreamReader;
28 import java.net.InetSocketAddress;
29 import java.net.Socket;
30 import java.util.ArrayList;
31 import java.util.Arrays;
32 import java.util.Collection;
33 import java.util.Collections;
34 import java.util.List;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.ExecutorService;
38 import java.util.concurrent.Executors;
39 import java.util.concurrent.Future;
40 import java.util.concurrent.ThreadFactory;
41 import java.util.concurrent.atomic.AtomicLong;
42 import org.junit.After;
43 import org.junit.AfterClass;
44 import org.junit.Before;
45 import org.junit.BeforeClass;
46 import org.junit.Test;
47 import org.junit.runner.RunWith;
48 import org.junit.runners.Parameterized;
49 import org.opendaylight.netconf.api.DocumentedException;
50 import org.opendaylight.netconf.api.NetconfMessage;
51 import org.opendaylight.netconf.api.capability.Capability;
52 import org.opendaylight.netconf.api.messages.NetconfHelloMessageAdditionalHeader;
53 import org.opendaylight.netconf.api.monitoring.CapabilityListener;
54 import org.opendaylight.netconf.api.monitoring.NetconfMonitoringService;
55 import org.opendaylight.netconf.api.monitoring.SessionEvent;
56 import org.opendaylight.netconf.api.monitoring.SessionListener;
57 import org.opendaylight.netconf.api.xml.XmlNetconfConstants;
58 import org.opendaylight.netconf.api.xml.XmlUtil;
59 import org.opendaylight.netconf.client.NetconfClientDispatcher;
60 import org.opendaylight.netconf.client.NetconfClientDispatcherImpl;
61 import org.opendaylight.netconf.client.NetconfMessageUtil;
62 import org.opendaylight.netconf.client.SimpleNetconfClientSessionListener;
63 import org.opendaylight.netconf.client.TestingNetconfClient;
64 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
65 import org.opendaylight.netconf.client.conf.NetconfClientConfigurationBuilder;
66 import org.opendaylight.netconf.mapping.api.HandlingPriority;
67 import org.opendaylight.netconf.mapping.api.NetconfOperation;
68 import org.opendaylight.netconf.mapping.api.NetconfOperationChainedExecution;
69 import org.opendaylight.netconf.mapping.api.NetconfOperationService;
70 import org.opendaylight.netconf.mapping.api.NetconfOperationServiceFactory;
71 import org.opendaylight.netconf.nettyutil.NeverReconnectStrategy;
72 import org.opendaylight.netconf.nettyutil.handler.exi.NetconfStartExiMessage;
73 import org.opendaylight.netconf.server.api.SessionIdProvider;
74 import org.opendaylight.netconf.server.impl.DefaultSessionIdProvider;
75 import org.opendaylight.netconf.server.osgi.AggregatedNetconfOperationServiceFactory;
76 import org.opendaylight.netconf.util.test.XmlFileLoader;
77 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.CapabilitiesBuilder;
78 import org.opendaylight.yangtools.concepts.Registration;
79 import org.slf4j.Logger;
80 import org.slf4j.LoggerFactory;
81 import org.w3c.dom.Document;
83 @RunWith(Parameterized.class)
84 public class ConcurrentClientsTest {
85 private static final Logger LOG = LoggerFactory.getLogger(ConcurrentClientsTest.class);
87 private static ExecutorService clientExecutor;
89 private static final int CONCURRENCY = 32;
90 private static final InetSocketAddress NETCONF_ADDRESS = new InetSocketAddress("127.0.0.1", 8303);
92 private final int nettyThreads;
93 private final Class<? extends Runnable> clientRunnable;
94 private final Set<String> serverCaps;
96 public ConcurrentClientsTest(final int nettyThreads, final Class<? extends Runnable> clientRunnable,
97 final Set<String> serverCaps) {
98 this.nettyThreads = nettyThreads;
99 this.clientRunnable = clientRunnable;
100 this.serverCaps = serverCaps;
103 @Parameterized.Parameters()
104 public static Collection<Object[]> data() {
105 return Arrays.asList(new Object[][]{
106 { 4, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES},
107 { 1, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES},
108 // empty set of capabilities = only base 1.0 netconf capability
109 { 4, TestingNetconfClientRunnable.class, Collections.emptySet()},
110 { 4, TestingNetconfClientRunnable.class, getOnlyExiServerCaps()},
111 { 4, TestingNetconfClientRunnable.class, getOnlyChunkServerCaps()},
112 { 4, BlockingClientRunnable.class, getOnlyExiServerCaps()},
113 { 1, BlockingClientRunnable.class, getOnlyExiServerCaps()},
117 private EventLoopGroup nettyGroup;
118 private NetconfClientDispatcher netconfClientDispatcher;
120 HashedWheelTimer hashedWheelTimer;
121 private TestingNetconfOperation testingNetconfOperation;
123 public static NetconfMonitoringService createMockedMonitoringService() {
124 NetconfMonitoringService monitoring = mock(NetconfMonitoringService.class);
125 final SessionListener sessionListener = mock(SessionListener.class);
126 doNothing().when(sessionListener).onSessionUp(any(NetconfServerSession.class));
127 doNothing().when(sessionListener).onSessionDown(any(NetconfServerSession.class));
128 doNothing().when(sessionListener).onSessionEvent(any(SessionEvent.class));
129 doReturn((Registration) () -> { }).when(monitoring)
130 .registerCapabilitiesListener(any(NetconfMonitoringService.CapabilitiesListener.class));
131 doReturn(sessionListener).when(monitoring).getSessionListener();
132 doReturn(new CapabilitiesBuilder().setCapability(Set.of()).build()).when(monitoring).getCapabilities();
137 public static void setUpClientExecutor() {
138 clientExecutor = Executors.newFixedThreadPool(CONCURRENCY, new ThreadFactory() {
142 public Thread newThread(final Runnable runnable) {
143 Thread thread = new Thread(runnable);
144 thread.setName("client-" + index++);
145 thread.setDaemon(true);
152 public void setUp() throws Exception {
153 hashedWheelTimer = new HashedWheelTimer();
154 nettyGroup = new NioEventLoopGroup(nettyThreads);
155 netconfClientDispatcher = new NetconfClientDispatcherImpl(nettyGroup, nettyGroup, hashedWheelTimer);
157 AggregatedNetconfOperationServiceFactory factoriesListener = new AggregatedNetconfOperationServiceFactory();
159 testingNetconfOperation = new TestingNetconfOperation();
160 factoriesListener.onAddNetconfOperationServiceFactory(
161 new TestingOperationServiceFactory(testingNetconfOperation));
163 SessionIdProvider idProvider = new DefaultSessionIdProvider();
165 NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new
166 NetconfServerSessionNegotiatorFactoryBuilder()
167 .setTimer(hashedWheelTimer)
168 .setAggregatedOpService(factoriesListener)
169 .setIdProvider(idProvider)
170 .setConnectionTimeoutMillis(5000)
171 .setMonitoringService(createMockedMonitoringService())
172 .setBaseCapabilities(serverCaps)
175 ServerChannelInitializer serverChannelInitializer =
176 new ServerChannelInitializer(serverNegotiatorFactory);
177 final NetconfServerDispatcherImpl dispatch =
178 new NetconfServerDispatcherImpl(serverChannelInitializer, nettyGroup, nettyGroup);
180 ChannelFuture server = dispatch.createServer(NETCONF_ADDRESS);
185 public void tearDown() {
186 hashedWheelTimer.stop();
188 nettyGroup.shutdownGracefully().get();
189 } catch (InterruptedException | ExecutionException e) {
190 LOG.warn("Ignoring exception while cleaning up after test", e);
195 public static void tearDownClientExecutor() {
196 clientExecutor.shutdownNow();
199 @Test(timeout = CONCURRENCY * 1000)
200 public void testConcurrentClients() throws Exception {
202 List<Future<?>> futures = new ArrayList<>(CONCURRENCY);
204 for (int i = 0; i < CONCURRENCY; i++) {
205 futures.add(clientExecutor.submit(getInstanceOfClientRunnable()));
208 for (Future<?> future : futures) {
211 } catch (InterruptedException e) {
212 throw new IllegalStateException(e);
213 } catch (ExecutionException e) {
214 LOG.error("Thread for testing client failed", e);
219 assertEquals(CONCURRENCY, testingNetconfOperation.getMessageCount());
222 public static Set<String> getOnlyExiServerCaps() {
223 return Sets.newHashSet(
224 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
225 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0
229 public static Set<String> getOnlyChunkServerCaps() {
230 return Sets.newHashSet(
231 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
232 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1
236 public Runnable getInstanceOfClientRunnable() throws Exception {
237 return clientRunnable.getConstructor(ConcurrentClientsTest.class).newInstance(this);
241 * Responds to all operations except start-exi and counts all requests.
243 private static class TestingNetconfOperation implements NetconfOperation {
245 private final AtomicLong counter = new AtomicLong();
248 public HandlingPriority canHandle(final Document message) {
249 return XmlUtil.toString(message).contains(NetconfStartExiMessage.START_EXI)
250 ? HandlingPriority.CANNOT_HANDLE :
251 HandlingPriority.HANDLE_WITH_MAX_PRIORITY;
254 @SuppressWarnings("checkstyle:IllegalCatch")
256 public Document handle(final Document requestMessage,
257 final NetconfOperationChainedExecution subsequentOperation) throws DocumentedException {
259 LOG.info("Handling netconf message from test {}", XmlUtil.toString(requestMessage));
260 counter.getAndIncrement();
261 return XmlUtil.readXmlToDocument("<test/>");
262 } catch (Exception e) {
263 throw new RuntimeException(e);
267 public long getMessageCount() {
268 return counter.get();
273 * Hardcoded operation service factory.
275 private static class TestingOperationServiceFactory implements NetconfOperationServiceFactory {
276 private final NetconfOperation[] operations;
278 TestingOperationServiceFactory(final NetconfOperation... operations) {
279 this.operations = operations;
283 public Set<Capability> getCapabilities() {
284 return Collections.emptySet();
288 public Registration registerCapabilityListener(final CapabilityListener listener) {
294 public NetconfOperationService createService(final String netconfSessionIdForReporting) {
295 return new NetconfOperationService() {
298 public Set<NetconfOperation> getNetconfOperations() {
299 return Sets.newHashSet(operations);
303 public void close() {
310 * Pure socket based blocking client.
312 public final class BlockingClientRunnable implements Runnable {
315 @SuppressWarnings("checkstyle:IllegalCatch")
319 } catch (Exception e) {
320 throw new IllegalStateException(Thread.currentThread().getName(), e);
324 private void run2() throws Exception {
325 InputStream clientHello = requireNonNull(XmlFileLoader.getResourceAsStream(
326 "netconfMessages/client_hello.xml"));
327 final InputStream getConfig = requireNonNull(XmlFileLoader.getResourceAsStream(
328 "netconfMessages/getConfig.xml"));
330 Socket clientSocket = new Socket(NETCONF_ADDRESS.getHostString(), NETCONF_ADDRESS.getPort());
331 DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
332 InputStreamReader inFromServer = new InputStreamReader(clientSocket.getInputStream());
334 StringBuilder sb = new StringBuilder();
335 while (!sb.toString().endsWith("]]>]]>")) {
336 sb.append((char) inFromServer.read());
338 LOG.info(sb.toString());
340 outToServer.write(ByteStreams.toByteArray(clientHello));
341 outToServer.write("]]>]]>".getBytes());
343 // Thread.sleep(100);
344 outToServer.write(ByteStreams.toByteArray(getConfig));
345 outToServer.write("]]>]]>".getBytes());
348 sb = new StringBuilder();
349 while (!sb.toString().endsWith("]]>]]>")) {
350 sb.append((char) inFromServer.read());
352 LOG.info(sb.toString());
353 clientSocket.close();
358 * TestingNetconfClient based runnable.
360 public final class TestingNetconfClientRunnable implements Runnable {
362 @SuppressWarnings("checkstyle:IllegalCatch")
366 final TestingNetconfClient netconfClient =
367 new TestingNetconfClient(Thread.currentThread().getName(), netconfClientDispatcher,
369 long sessionId = netconfClient.getSessionId();
370 LOG.info("Client with session id {}: hello exchanged", sessionId);
372 final NetconfMessage getMessage = XmlFileLoader
373 .xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
374 NetconfMessage result = netconfClient.sendRequest(getMessage).get();
375 LOG.info("Client with session id {}: got result {}", sessionId, result);
377 checkState(NetconfMessageUtil.isErrorMessage(result) == false,
378 "Received error response: " + XmlUtil.toString(result.getDocument()) + " to request: "
379 + XmlUtil.toString(getMessage.getDocument()));
381 netconfClient.close();
382 LOG.info("Client with session id {}: ended", sessionId);
383 } catch (final Exception e) {
384 throw new IllegalStateException(Thread.currentThread().getName(), e);
388 private NetconfClientConfiguration getClientConfig() {
389 final NetconfClientConfigurationBuilder b = NetconfClientConfigurationBuilder.create();
390 b.withAddress(NETCONF_ADDRESS);
391 b.withAdditionalHeader(new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp",
393 b.withSessionListener(new SimpleNetconfClientSessionListener());
394 b.withReconnectStrategy(new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE,
395 NetconfClientConfigurationBuilder.DEFAULT_CONNECTION_TIMEOUT_MILLIS));