2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.netconf.impl;
11 import static com.google.common.base.Preconditions.checkNotNull;
12 import static org.junit.Assert.fail;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Mockito.doNothing;
15 import static org.mockito.Mockito.mock;
17 import java.io.DataOutputStream;
18 import java.io.InputStream;
19 import java.io.InputStreamReader;
20 import java.lang.management.ManagementFactory;
21 import java.net.InetSocketAddress;
22 import java.net.Socket;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.List;
28 import org.apache.commons.io.IOUtils;
29 import org.junit.After;
30 import org.junit.Before;
31 import org.junit.Test;
32 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
33 import org.opendaylight.controller.netconf.api.NetconfMessage;
34 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
35 import org.opendaylight.controller.netconf.client.test.TestingNetconfClient;
36 import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
37 import org.opendaylight.controller.netconf.impl.osgi.SessionMonitoringService;
38 import org.opendaylight.controller.netconf.mapping.api.Capability;
39 import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
40 import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
41 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
42 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
43 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
44 import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
45 import org.opendaylight.controller.netconf.util.messages.NetconfStartExiMessage;
46 import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
47 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50 import org.w3c.dom.Document;
52 import com.google.common.base.Optional;
53 import com.google.common.collect.Sets;
55 import io.netty.channel.ChannelFuture;
56 import io.netty.channel.EventLoopGroup;
57 import io.netty.channel.nio.NioEventLoopGroup;
58 import io.netty.util.HashedWheelTimer;
60 public class ConcurrentClientsTest {
62 private static final int CONCURRENCY = 16;
63 private EventLoopGroup nettyGroup;
64 private NetconfClientDispatcher netconfClientDispatcher;
66 private final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303);
68 static final Logger logger = LoggerFactory.getLogger(ConcurrentClientsTest.class);
70 private DefaultCommitNotificationProducer commitNot;
71 private NetconfServerDispatcher dispatch;
75 HashedWheelTimer hashedWheelTimer;
77 public static SessionMonitoringService createMockedMonitoringService() {
78 SessionMonitoringService monitoring = mock(SessionMonitoringService.class);
79 doNothing().when(monitoring).onSessionUp(any(NetconfServerSession.class));
80 doNothing().when(monitoring).onSessionDown(any(NetconfServerSession.class));
85 public void setUp() throws Exception {
87 nettyGroup = new NioEventLoopGroup();
88 NetconfHelloMessageAdditionalHeader additionalHeader = new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp", "client");
89 netconfClientDispatcher = new NetconfClientDispatcher( nettyGroup, nettyGroup, additionalHeader, 5000);
91 NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
92 factoriesListener.onAddNetconfOperationServiceFactory(mockOpF());
94 SessionIdProvider idProvider = new SessionIdProvider();
95 hashedWheelTimer = new HashedWheelTimer();
97 NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory(
98 hashedWheelTimer, factoriesListener, idProvider, 5000, commitNot, createMockedMonitoringService());
100 commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
102 NetconfServerDispatcher.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcher.ServerChannelInitializer(serverNegotiatorFactory);
103 dispatch = new NetconfServerDispatcher(serverChannelInitializer, nettyGroup, nettyGroup);
105 ChannelFuture s = dispatch.createServer(netconfAddress);
110 public void tearDown(){
111 hashedWheelTimer.stop();
112 nettyGroup.shutdownGracefully();
115 private NetconfOperationServiceFactory mockOpF() {
116 return new NetconfOperationServiceFactory() {
118 public NetconfOperationService createService(String netconfSessionIdForReporting) {
119 return new NetconfOperationService() {
121 public Set<Capability> getCapabilities() {
122 return Collections.emptySet();
126 public Set<NetconfOperation> getNetconfOperations() {
127 return Sets.<NetconfOperation> newHashSet(new NetconfOperation() {
129 public HandlingPriority canHandle(Document message) {
130 return XmlUtil.toString(message).contains(NetconfStartExiMessage.START_EXI) ?
131 HandlingPriority.CANNOT_HANDLE :
132 HandlingPriority.HANDLE_WITH_MAX_PRIORITY;
136 public Document handle(Document requestMessage, NetconfOperationChainedExecution subsequentOperation) throws NetconfDocumentedException {
138 return XmlUtil.readXmlToDocument("<test/>");
139 } catch (Exception e) {
140 throw new RuntimeException(e);
147 public void close() {
155 public void cleanUp() throws Exception {
159 @Test(timeout = 30 * 1000)
160 public void multipleClients() throws Exception {
161 List<TestingThread> threads = new ArrayList<>();
163 final int attempts = 5;
164 for (int i = 0; i < CONCURRENCY; i++) {
165 TestingThread thread = new TestingThread(String.valueOf(i), attempts);
170 for (TestingThread thread : threads) {
172 if(thread.thrownException.isPresent()) {
173 Exception exception = thread.thrownException.get();
174 logger.error("Thread for testing client failed", exception);
175 fail("Client thread " + thread + " failed: " + exception.getMessage());
180 @Test(timeout = 30 * 1000)
181 public void synchronizationTest() throws Exception {
182 new BlockingThread("foo").run2();
185 @Test(timeout = 30 * 1000)
186 public void multipleBlockingClients() throws Exception {
187 List<BlockingThread> threads = new ArrayList<>();
188 for (int i = 0; i < CONCURRENCY; i++) {
189 BlockingThread thread = new BlockingThread(String.valueOf(i));
194 for (BlockingThread thread : threads) {
196 if(thread.thrownException.isPresent()) {
197 Exception exception = thread.thrownException.get();
198 logger.error("Thread for testing client failed", exception);
199 fail("Client thread " + thread + " failed: " + exception.getMessage());
204 class BlockingThread extends Thread {
205 private Optional<Exception> thrownException;
207 public BlockingThread(String name) {
208 super("client-" + name);
215 thrownException = Optional.absent();
216 } catch (Exception e) {
217 thrownException = Optional.of(e);
221 private void run2() throws Exception {
222 InputStream clientHello = checkNotNull(XmlFileLoader
223 .getResourceAsStream("netconfMessages/client_hello.xml"));
224 InputStream getConfig = checkNotNull(XmlFileLoader.getResourceAsStream("netconfMessages/getConfig.xml"));
226 Socket clientSocket = new Socket(netconfAddress.getHostString(), netconfAddress.getPort());
227 DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
228 InputStreamReader inFromServer = new InputStreamReader(clientSocket.getInputStream());
230 StringBuffer sb = new StringBuffer();
231 while (sb.toString().endsWith("]]>]]>") == false) {
232 sb.append((char) inFromServer.read());
234 logger.info(sb.toString());
236 outToServer.write(IOUtils.toByteArray(clientHello));
237 outToServer.write("]]>]]>".getBytes());
239 // Thread.sleep(100);
240 outToServer.write(IOUtils.toByteArray(getConfig));
241 outToServer.write("]]>]]>".getBytes());
244 sb = new StringBuffer();
245 while (sb.toString().endsWith("]]>]]>") == false) {
246 sb.append((char) inFromServer.read());
248 logger.info(sb.toString());
249 clientSocket.close();
253 class TestingThread extends Thread {
255 private final String clientId;
256 private final int attempts;
257 private Optional<Exception> thrownException;
259 TestingThread(String clientId, int attempts) {
260 this.clientId = clientId;
261 this.attempts = attempts;
262 setName("client-" + clientId);
268 final TestingNetconfClient netconfClient = new TestingNetconfClient(clientId, netconfAddress, netconfClientDispatcher);
269 long sessionId = netconfClient.getSessionId();
270 logger.info("Client with sessionid {} hello exchanged", sessionId);
272 final NetconfMessage getMessage = XmlFileLoader
273 .xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
274 NetconfMessage result = netconfClient.sendRequest(getMessage).get();
275 logger.info("Client with sessionid {} got result {}", sessionId, result);
276 netconfClient.close();
277 logger.info("Client with session id {} ended", sessionId);
278 thrownException = Optional.absent();
279 } catch (final Exception e) {
280 thrownException = Optional.of(e);