2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.netconf.impl;
11 import static com.google.common.base.Preconditions.checkNotNull;
12 import static org.junit.Assert.assertEquals;
13 import static org.junit.Assert.fail;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Mockito.doNothing;
16 import static org.mockito.Mockito.mock;
18 import com.google.common.base.Preconditions;
19 import com.google.common.collect.Lists;
20 import com.google.common.collect.Sets;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.EventLoopGroup;
23 import io.netty.channel.nio.NioEventLoopGroup;
24 import io.netty.util.HashedWheelTimer;
25 import io.netty.util.concurrent.GlobalEventExecutor;
26 import java.io.DataOutputStream;
27 import java.io.InputStream;
28 import java.io.InputStreamReader;
29 import java.lang.management.ManagementFactory;
30 import java.net.InetSocketAddress;
31 import java.net.Socket;
32 import java.util.Arrays;
33 import java.util.Collection;
34 import java.util.Collections;
35 import java.util.List;
37 import java.util.concurrent.ExecutionException;
38 import java.util.concurrent.ExecutorService;
39 import java.util.concurrent.Executors;
40 import java.util.concurrent.Future;
41 import java.util.concurrent.ThreadFactory;
42 import java.util.concurrent.atomic.AtomicLong;
43 import org.apache.commons.io.IOUtils;
44 import org.junit.After;
45 import org.junit.AfterClass;
46 import org.junit.Before;
47 import org.junit.BeforeClass;
48 import org.junit.Test;
49 import org.junit.runner.RunWith;
50 import org.junit.runners.Parameterized;
51 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
52 import org.opendaylight.controller.netconf.api.NetconfMessage;
53 import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
54 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
55 import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl;
56 import org.opendaylight.controller.netconf.client.SimpleNetconfClientSessionListener;
57 import org.opendaylight.controller.netconf.client.TestingNetconfClient;
58 import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
59 import org.opendaylight.controller.netconf.client.conf.NetconfClientConfigurationBuilder;
60 import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
61 import org.opendaylight.controller.netconf.impl.osgi.SessionMonitoringService;
62 import org.opendaylight.controller.netconf.mapping.api.Capability;
63 import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
64 import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
65 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
66 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
67 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
68 import org.opendaylight.controller.netconf.nettyutil.handler.exi.NetconfStartExiMessage;
69 import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
70 import org.opendaylight.controller.netconf.util.messages.NetconfMessageUtil;
71 import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
72 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
73 import org.opendaylight.protocol.framework.NeverReconnectStrategy;
74 import org.slf4j.Logger;
75 import org.slf4j.LoggerFactory;
76 import org.w3c.dom.Document;
78 @RunWith(Parameterized.class)
79 public class ConcurrentClientsTest {
80 private static final Logger LOG = LoggerFactory.getLogger(ConcurrentClientsTest.class);
82 private static ExecutorService clientExecutor;
84 private static final int CONCURRENCY = 32;
85 private static final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303);
87 private int nettyThreads;
88 private Class<? extends Runnable> clientRunnable;
89 private Set<String> serverCaps;
91 public ConcurrentClientsTest(int nettyThreads, Class<? extends Runnable> clientRunnable, Set<String> serverCaps) {
92 this.nettyThreads = nettyThreads;
93 this.clientRunnable = clientRunnable;
94 this.serverCaps = serverCaps;
97 @Parameterized.Parameters()
98 public static Collection<Object[]> data() {
99 return Arrays.asList(new Object[][]{{4, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES},
100 {1, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES},
101 // empty set of capabilities = only base 1.0 netconf capability
102 {4, TestingNetconfClientRunnable.class, Collections.emptySet()},
103 {4, TestingNetconfClientRunnable.class, getOnlyExiServerCaps()},
104 {4, TestingNetconfClientRunnable.class, getOnlyChunkServerCaps()},
105 {4, BlockingClientRunnable.class, getOnlyExiServerCaps()},
106 {1, BlockingClientRunnable.class, getOnlyExiServerCaps()},
110 private EventLoopGroup nettyGroup;
111 private NetconfClientDispatcher netconfClientDispatcher;
113 private DefaultCommitNotificationProducer commitNot;
115 HashedWheelTimer hashedWheelTimer;
116 private TestingNetconfOperation testingNetconfOperation;
118 public static SessionMonitoringService createMockedMonitoringService() {
119 SessionMonitoringService monitoring = mock(SessionMonitoringService.class);
120 doNothing().when(monitoring).onSessionUp(any(NetconfServerSession.class));
121 doNothing().when(monitoring).onSessionDown(any(NetconfServerSession.class));
126 public static void setUpClientExecutor() {
127 clientExecutor = Executors.newFixedThreadPool(CONCURRENCY, new ThreadFactory() {
131 public Thread newThread(final Runnable r) {
132 Thread thread = new Thread(r);
133 thread.setName("client-" + i++);
134 thread.setDaemon(true);
141 public void setUp() throws Exception {
142 hashedWheelTimer = new HashedWheelTimer();
143 nettyGroup = new NioEventLoopGroup(nettyThreads);
144 netconfClientDispatcher = new NetconfClientDispatcherImpl(nettyGroup, nettyGroup, hashedWheelTimer);
146 NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
148 testingNetconfOperation = new TestingNetconfOperation();
149 factoriesListener.onAddNetconfOperationServiceFactory(new TestingOperationServiceFactory(testingNetconfOperation));
151 SessionIdProvider idProvider = new SessionIdProvider();
153 NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory(
154 hashedWheelTimer, factoriesListener, idProvider, 5000, commitNot, createMockedMonitoringService(), serverCaps);
156 commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
158 NetconfServerDispatcher.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcher.ServerChannelInitializer(serverNegotiatorFactory);
159 final NetconfServerDispatcher dispatch = new NetconfServerDispatcher(serverChannelInitializer, nettyGroup, nettyGroup);
161 ChannelFuture s = dispatch.createServer(netconfAddress);
166 public void tearDown(){
168 hashedWheelTimer.stop();
170 nettyGroup.shutdownGracefully().get();
171 } catch (InterruptedException | ExecutionException e) {
172 LOG.warn("Ignoring exception while cleaning up after test", e);
177 public static void tearDownClientExecutor() {
178 clientExecutor.shutdownNow();
181 @Test(timeout = CONCURRENCY * 1000)
182 public void testConcurrentClients() throws Exception {
184 List<Future<?>> futures = Lists.newArrayListWithCapacity(CONCURRENCY);
186 for (int i = 0; i < CONCURRENCY; i++) {
187 futures.add(clientExecutor.submit(getInstanceOfClientRunnable()));
190 for (Future<?> future : futures) {
193 } catch (InterruptedException e) {
194 throw new IllegalStateException(e);
195 } catch (ExecutionException e) {
196 LOG.error("Thread for testing client failed", e);
197 fail("Client failed: " + e.getMessage());
201 assertEquals(CONCURRENCY, testingNetconfOperation.getMessageCount());
204 public static Set<String> getOnlyExiServerCaps() {
205 return Sets.newHashSet(
206 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
207 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0
211 public static Set<String> getOnlyChunkServerCaps() {
212 return Sets.newHashSet(
213 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
214 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1
218 public Runnable getInstanceOfClientRunnable() throws Exception {
219 return clientRunnable.getConstructor(ConcurrentClientsTest.class).newInstance(this);
223 * Responds to all operations except start-exi and counts all requests
225 private static class TestingNetconfOperation implements NetconfOperation {
227 private final AtomicLong counter = new AtomicLong();
230 public HandlingPriority canHandle(Document message) {
231 return XmlUtil.toString(message).contains(NetconfStartExiMessage.START_EXI) ?
232 HandlingPriority.CANNOT_HANDLE :
233 HandlingPriority.HANDLE_WITH_MAX_PRIORITY;
237 public Document handle(Document requestMessage, NetconfOperationChainedExecution subsequentOperation) throws NetconfDocumentedException {
239 LOG.info("Handling netconf message from test {}", XmlUtil.toString(requestMessage));
240 counter.getAndIncrement();
241 return XmlUtil.readXmlToDocument("<test/>");
242 } catch (Exception e) {
243 throw new RuntimeException(e);
247 public long getMessageCount() {
248 return counter.get();
253 * Hardcoded operation service factory
255 private static class TestingOperationServiceFactory implements NetconfOperationServiceFactory {
256 private final NetconfOperation[] operations;
258 public TestingOperationServiceFactory(final NetconfOperation... operations) {
259 this.operations = operations;
263 public NetconfOperationService createService(String netconfSessionIdForReporting) {
264 return new NetconfOperationService() {
266 public Set<Capability> getCapabilities() {
267 return Collections.emptySet();
271 public Set<NetconfOperation> getNetconfOperations() {
272 return Sets.newHashSet(operations);
276 public void close() {}
282 * Pure socket based blocking client
284 public final class BlockingClientRunnable implements Runnable {
290 } catch (Exception e) {
291 throw new IllegalStateException(Thread.currentThread().getName(), e);
295 private void run2() throws Exception {
296 InputStream clientHello = checkNotNull(XmlFileLoader
297 .getResourceAsStream("netconfMessages/client_hello.xml"));
298 InputStream getConfig = checkNotNull(XmlFileLoader.getResourceAsStream("netconfMessages/getConfig.xml"));
300 Socket clientSocket = new Socket(netconfAddress.getHostString(), netconfAddress.getPort());
301 DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
302 InputStreamReader inFromServer = new InputStreamReader(clientSocket.getInputStream());
304 StringBuffer sb = new StringBuffer();
305 while (sb.toString().endsWith("]]>]]>") == false) {
306 sb.append((char) inFromServer.read());
308 LOG.info(sb.toString());
310 outToServer.write(IOUtils.toByteArray(clientHello));
311 outToServer.write("]]>]]>".getBytes());
313 // Thread.sleep(100);
314 outToServer.write(IOUtils.toByteArray(getConfig));
315 outToServer.write("]]>]]>".getBytes());
318 sb = new StringBuffer();
319 while (sb.toString().endsWith("]]>]]>") == false) {
320 sb.append((char) inFromServer.read());
322 LOG.info(sb.toString());
323 clientSocket.close();
328 * TestingNetconfClient based runnable
330 public final class TestingNetconfClientRunnable implements Runnable {
335 final TestingNetconfClient netconfClient =
336 new TestingNetconfClient(Thread.currentThread().getName(), netconfClientDispatcher, getClientConfig());
337 long sessionId = netconfClient.getSessionId();
338 LOG.info("Client with session id {}: hello exchanged", sessionId);
340 final NetconfMessage getMessage = XmlFileLoader
341 .xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
342 NetconfMessage result = netconfClient.sendRequest(getMessage).get();
343 LOG.info("Client with session id {}: got result {}", sessionId, result);
345 Preconditions.checkState(NetconfMessageUtil.isErrorMessage(result) == false,
346 "Received error response: " + XmlUtil.toString(result.getDocument()) + " to request: "
347 + XmlUtil.toString(getMessage.getDocument()));
349 netconfClient.close();
350 LOG.info("Client with session id {}: ended", sessionId);
351 } catch (final Exception e) {
352 throw new IllegalStateException(Thread.currentThread().getName(), e);
356 private NetconfClientConfiguration getClientConfig() {
357 final NetconfClientConfigurationBuilder b = NetconfClientConfigurationBuilder.create();
358 b.withAddress(netconfAddress);
359 b.withAdditionalHeader(new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp",
361 b.withSessionListener(new SimpleNetconfClientSessionListener());
362 b.withReconnectStrategy(new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE,
363 NetconfClientConfigurationBuilder.DEFAULT_CONNECTION_TIMEOUT_MILLIS));