2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.netconf.impl;
11 import com.google.common.base.Optional;
12 import com.google.common.collect.Sets;
13 import io.netty.channel.ChannelFuture;
14 import io.netty.channel.EventLoopGroup;
15 import io.netty.channel.nio.NioEventLoopGroup;
16 import io.netty.util.HashedWheelTimer;
17 import org.apache.commons.io.IOUtils;
18 import org.junit.After;
19 import org.junit.Before;
20 import org.junit.Test;
21 import org.mockito.Mock;
22 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
23 import org.opendaylight.controller.netconf.api.NetconfMessage;
24 import org.opendaylight.controller.netconf.client.NetconfClient;
25 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
26 import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
27 import org.opendaylight.controller.netconf.impl.osgi.SessionMonitoringService;
28 import org.opendaylight.controller.netconf.mapping.api.Capability;
29 import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
30 import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
31 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
32 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
33 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
34 import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
35 import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
36 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39 import org.w3c.dom.Document;
41 import java.io.DataOutputStream;
42 import java.io.InputStream;
43 import java.io.InputStreamReader;
44 import java.lang.management.ManagementFactory;
45 import java.net.InetSocketAddress;
46 import java.net.Socket;
47 import java.util.ArrayList;
48 import java.util.Collections;
49 import java.util.List;
52 import static com.google.common.base.Preconditions.checkNotNull;
53 import static org.junit.Assert.fail;
54 import static org.mockito.Matchers.any;
55 import static org.mockito.Mockito.doNothing;
56 import static org.mockito.MockitoAnnotations.initMocks;
58 public class ConcurrentClientsTest {
60 private static final int CONCURRENCY = 16;
61 private EventLoopGroup nettyGroup;
62 private NetconfClientDispatcher netconfClientDispatcher;
64 private final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303);
66 static final Logger logger = LoggerFactory.getLogger(ConcurrentClientsTest.class);
68 private DefaultCommitNotificationProducer commitNot;
69 private NetconfServerDispatcher dispatch;
72 private SessionMonitoringService monitoring;
74 HashedWheelTimer hashedWheelTimer;
77 public void setUp() throws Exception {
79 nettyGroup = new NioEventLoopGroup();
80 NetconfHelloMessageAdditionalHeader additionalHeader = new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp", "client");
81 netconfClientDispatcher = new NetconfClientDispatcher( nettyGroup, nettyGroup, additionalHeader, 5000);
83 NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
84 factoriesListener.onAddNetconfOperationServiceFactory(mockOpF());
86 SessionIdProvider idProvider = new SessionIdProvider();
87 hashedWheelTimer = new HashedWheelTimer();
88 NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory(
89 hashedWheelTimer, factoriesListener, idProvider, 5000);
91 commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
93 doNothing().when(monitoring).onSessionUp(any(NetconfServerSession.class));
94 doNothing().when(monitoring).onSessionDown(any(NetconfServerSession.class));
96 NetconfServerSessionListenerFactory listenerFactory = new NetconfServerSessionListenerFactory(
97 factoriesListener, commitNot, idProvider, monitoring);
98 NetconfServerDispatcher.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcher.ServerChannelInitializer(serverNegotiatorFactory, listenerFactory);
99 dispatch = new NetconfServerDispatcher(serverChannelInitializer, nettyGroup, nettyGroup);
101 ChannelFuture s = dispatch.createServer(netconfAddress);
106 public void tearDown(){
107 hashedWheelTimer.stop();
108 nettyGroup.shutdownGracefully();
111 private NetconfOperationServiceFactory mockOpF() {
112 return new NetconfOperationServiceFactory() {
114 public NetconfOperationService createService(String netconfSessionIdForReporting) {
115 return new NetconfOperationService() {
117 public Set<Capability> getCapabilities() {
118 return Collections.emptySet();
122 public Set<NetconfOperation> getNetconfOperations() {
123 return Sets.<NetconfOperation> newHashSet(new NetconfOperation() {
125 public HandlingPriority canHandle(Document message) {
126 return HandlingPriority.getHandlingPriority(Integer.MAX_VALUE);
130 public Document handle(Document requestMessage, NetconfOperationChainedExecution subsequentOperation) throws NetconfDocumentedException {
132 return XmlUtil.readXmlToDocument("<test/>");
133 } catch (Exception e) {
134 throw new RuntimeException(e);
141 public void close() {
149 public void cleanUp() throws Exception {
154 public void multipleClients() throws Exception {
155 List<TestingThread> threads = new ArrayList<>();
157 final int attempts = 5;
158 for (int i = 0; i < CONCURRENCY; i++) {
159 TestingThread thread = new TestingThread(String.valueOf(i), attempts);
164 for (TestingThread thread : threads) {
166 if(thread.thrownException.isPresent()) {
167 Exception exception = thread.thrownException.get();
168 logger.error("Thread for testing client failed", exception);
169 fail("Client thread " + thread + " failed: " + exception.getMessage());
175 public void synchronizationTest() throws Exception {
176 new BlockingThread("foo").run2();
180 public void multipleBlockingClients() throws Exception {
181 List<BlockingThread> threads = new ArrayList<>();
182 for (int i = 0; i < CONCURRENCY; i++) {
183 BlockingThread thread = new BlockingThread(String.valueOf(i));
188 for (BlockingThread thread : threads) {
190 if(thread.thrownException.isPresent()) {
191 Exception exception = thread.thrownException.get();
192 logger.error("Thread for testing client failed", exception);
193 fail("Client thread " + thread + " failed: " + exception.getMessage());
198 class BlockingThread extends Thread {
199 private Optional<Exception> thrownException;
201 public BlockingThread(String name) {
202 super("client-" + name);
209 thrownException = Optional.absent();
210 } catch (Exception e) {
211 thrownException = Optional.of(e);
215 private void run2() throws Exception {
216 InputStream clientHello = checkNotNull(XmlFileLoader
217 .getResourceAsStream("netconfMessages/client_hello.xml"));
218 InputStream getConfig = checkNotNull(XmlFileLoader.getResourceAsStream("netconfMessages/getConfig.xml"));
220 Socket clientSocket = new Socket(netconfAddress.getHostString(), netconfAddress.getPort());
221 DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
222 InputStreamReader inFromServer = new InputStreamReader(clientSocket.getInputStream());
224 StringBuffer sb = new StringBuffer();
225 while (sb.toString().endsWith("]]>]]>") == false) {
226 sb.append((char) inFromServer.read());
228 logger.info(sb.toString());
230 outToServer.write(IOUtils.toByteArray(clientHello));
231 outToServer.write("]]>]]>".getBytes());
233 // Thread.sleep(100);
234 outToServer.write(IOUtils.toByteArray(getConfig));
235 outToServer.write("]]>]]>".getBytes());
238 sb = new StringBuffer();
239 while (sb.toString().endsWith("]]>]]>") == false) {
240 sb.append((char) inFromServer.read());
242 logger.info(sb.toString());
243 clientSocket.close();
247 class TestingThread extends Thread {
249 private final String clientId;
250 private final int attempts;
251 private Optional<Exception> thrownException;
253 TestingThread(String clientId, int attempts) {
254 this.clientId = clientId;
255 this.attempts = attempts;
256 setName("client-" + clientId);
262 final NetconfClient netconfClient = new NetconfClient(clientId, netconfAddress, netconfClientDispatcher);
263 long sessionId = netconfClient.getSessionId();
264 logger.info("Client with sessionid {} hello exchanged", sessionId);
266 final NetconfMessage getMessage = XmlFileLoader
267 .xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
268 NetconfMessage result = netconfClient.sendMessage(getMessage);
269 logger.info("Client with sessionid {} got result {}", sessionId, result);
270 netconfClient.close();
271 logger.info("Client with session id {} ended", sessionId);
272 thrownException = Optional.absent();
273 } catch (final Exception e) {
274 thrownException = Optional.of(e);