2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.netconf.impl;
11 import static com.google.common.base.Preconditions.checkNotNull;
12 import static org.junit.Assert.assertEquals;
13 import static org.junit.Assert.fail;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Mockito.doNothing;
16 import static org.mockito.Mockito.mock;
18 import com.google.common.base.Preconditions;
19 import java.io.DataOutputStream;
20 import java.io.InputStream;
21 import java.io.InputStreamReader;
22 import java.lang.management.ManagementFactory;
23 import java.net.InetSocketAddress;
24 import java.net.Socket;
25 import java.util.ArrayList;
26 import java.util.Collections;
27 import java.util.List;
30 import java.util.concurrent.atomic.AtomicLong;
31 import org.apache.commons.io.IOUtils;
32 import org.junit.After;
33 import org.junit.Before;
34 import org.junit.Ignore;
35 import org.junit.Test;
36 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
37 import org.opendaylight.controller.netconf.api.NetconfMessage;
38 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
39 import org.opendaylight.controller.netconf.client.test.TestingNetconfClient;
40 import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
41 import org.opendaylight.controller.netconf.impl.osgi.SessionMonitoringService;
42 import org.opendaylight.controller.netconf.mapping.api.Capability;
43 import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
44 import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
45 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
46 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
47 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
48 import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
49 import org.opendaylight.controller.netconf.util.messages.NetconfMessageUtil;
50 import org.opendaylight.controller.netconf.util.messages.NetconfStartExiMessage;
51 import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
52 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55 import org.w3c.dom.Document;
57 import com.google.common.base.Optional;
58 import com.google.common.collect.Sets;
60 import io.netty.channel.ChannelFuture;
61 import io.netty.channel.EventLoopGroup;
62 import io.netty.channel.nio.NioEventLoopGroup;
63 import io.netty.util.HashedWheelTimer;
65 public class ConcurrentClientsTest {
67 private static final int CONCURRENCY = 64;
68 public static final int NETTY_THREADS = 4;
70 private EventLoopGroup nettyGroup;
71 private NetconfClientDispatcher netconfClientDispatcher;
73 private final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303);
75 static final Logger logger = LoggerFactory.getLogger(ConcurrentClientsTest.class);
77 private DefaultCommitNotificationProducer commitNot;
79 HashedWheelTimer hashedWheelTimer;
80 private TestingNetconfOperation testingNetconfOperation;
82 public static SessionMonitoringService createMockedMonitoringService() {
83 SessionMonitoringService monitoring = mock(SessionMonitoringService.class);
84 doNothing().when(monitoring).onSessionUp(any(NetconfServerSession.class));
85 doNothing().when(monitoring).onSessionDown(any(NetconfServerSession.class));
89 // TODO refactor and test with different configurations
92 public void setUp() throws Exception {
94 nettyGroup = new NioEventLoopGroup(NETTY_THREADS);
95 NetconfHelloMessageAdditionalHeader additionalHeader = new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp", "client");
96 netconfClientDispatcher = new NetconfClientDispatcher( nettyGroup, nettyGroup, additionalHeader, 5000);
98 NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
99 testingNetconfOperation = new TestingNetconfOperation();
100 factoriesListener.onAddNetconfOperationServiceFactory(mockOpF(testingNetconfOperation));
102 SessionIdProvider idProvider = new SessionIdProvider();
103 hashedWheelTimer = new HashedWheelTimer();
105 NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory(
106 hashedWheelTimer, factoriesListener, idProvider, 5000, commitNot, createMockedMonitoringService());
108 commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
110 NetconfServerDispatcher.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcher.ServerChannelInitializer(serverNegotiatorFactory);
111 final NetconfServerDispatcher dispatch = new NetconfServerDispatcher(serverChannelInitializer, nettyGroup, nettyGroup);
113 ChannelFuture s = dispatch.createServer(netconfAddress);
118 public void tearDown(){
119 hashedWheelTimer.stop();
120 nettyGroup.shutdownGracefully();
123 private NetconfOperationServiceFactory mockOpF(final NetconfOperation... operations) {
124 return new TestingOperationServiceFactory(operations);
128 public void cleanUp() throws Exception {
132 @Test(timeout = 30 * 1000)
133 public void multipleClients() throws Exception {
134 List<TestingThread> threads = new ArrayList<>();
136 final int attempts = 5;
137 for (int i = 0; i < CONCURRENCY; i++) {
138 TestingThread thread = new TestingThread(String.valueOf(i), attempts);
143 for (TestingThread thread : threads) {
145 if(thread.thrownException.isPresent()) {
146 Exception exception = thread.thrownException.get();
147 logger.error("Thread for testing client failed", exception);
148 fail("Client thread " + thread + " failed: " + exception.getMessage());
152 assertEquals(CONCURRENCY, testingNetconfOperation.getMessageCount());
156 * Cannot handle CHUNK, make server configurable
159 @Test(timeout = 30 * 1000)
160 public void synchronizationTest() throws Exception {
161 new BlockingThread("foo").run2();
165 * Cannot handle CHUNK, make server configurable
168 @Test(timeout = 30 * 1000)
169 public void multipleBlockingClients() throws Exception {
170 List<BlockingThread> threads = new ArrayList<>();
171 for (int i = 0; i < CONCURRENCY; i++) {
172 BlockingThread thread = new BlockingThread(String.valueOf(i));
177 for (BlockingThread thread : threads) {
179 if(thread.thrownException.isPresent()) {
180 Exception exception = thread.thrownException.get();
181 logger.error("Thread for testing client failed", exception);
182 fail("Client thread " + thread + " failed: " + exception.getMessage());
187 private static class TestingNetconfOperation implements NetconfOperation {
189 private final AtomicLong counter = new AtomicLong();
192 public HandlingPriority canHandle(Document message) {
193 return XmlUtil.toString(message).contains(NetconfStartExiMessage.START_EXI) ?
194 HandlingPriority.CANNOT_HANDLE :
195 HandlingPriority.HANDLE_WITH_MAX_PRIORITY;
199 public Document handle(Document requestMessage, NetconfOperationChainedExecution subsequentOperation) throws NetconfDocumentedException {
201 logger.info("Handling netconf message from test {}", XmlUtil.toString(requestMessage));
202 counter.getAndIncrement();
203 return XmlUtil.readXmlToDocument("<test/>");
204 } catch (Exception e) {
205 throw new RuntimeException(e);
209 public long getMessageCount() {
210 return counter.get();
214 private static class TestingOperationServiceFactory implements NetconfOperationServiceFactory {
215 private final NetconfOperation[] operations;
217 public TestingOperationServiceFactory(final NetconfOperation... operations) {
218 this.operations = operations;
222 public NetconfOperationService createService(String netconfSessionIdForReporting) {
223 return new NetconfOperationService() {
225 public Set<Capability> getCapabilities() {
226 return Collections.emptySet();
230 public Set<NetconfOperation> getNetconfOperations() {
231 return Sets.<NetconfOperation> newHashSet(operations);
235 public void close() {
241 class BlockingThread extends Thread {
242 private Optional<Exception> thrownException;
244 public BlockingThread(String name) {
245 super("client-" + name);
252 thrownException = Optional.absent();
253 } catch (Exception e) {
254 thrownException = Optional.of(e);
258 private void run2() throws Exception {
259 InputStream clientHello = checkNotNull(XmlFileLoader
260 .getResourceAsStream("netconfMessages/client_hello.xml"));
261 InputStream getConfig = checkNotNull(XmlFileLoader.getResourceAsStream("netconfMessages/getConfig.xml"));
263 Socket clientSocket = new Socket(netconfAddress.getHostString(), netconfAddress.getPort());
264 DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
265 InputStreamReader inFromServer = new InputStreamReader(clientSocket.getInputStream());
267 StringBuffer sb = new StringBuffer();
268 while (sb.toString().endsWith("]]>]]>") == false) {
269 sb.append((char) inFromServer.read());
271 logger.info(sb.toString());
273 outToServer.write(IOUtils.toByteArray(clientHello));
274 outToServer.write("]]>]]>".getBytes());
276 // Thread.sleep(100);
277 outToServer.write(IOUtils.toByteArray(getConfig));
278 outToServer.write("]]>]]>".getBytes());
281 sb = new StringBuffer();
282 while (sb.toString().endsWith("]]>]]>") == false) {
283 sb.append((char) inFromServer.read());
285 logger.info(sb.toString());
286 clientSocket.close();
290 class TestingThread extends Thread {
292 private final String clientId;
293 private final int attempts;
294 private Optional<Exception> thrownException;
296 TestingThread(String clientId, int attempts) {
297 this.clientId = clientId;
298 this.attempts = attempts;
299 setName("client-" + clientId);
305 final TestingNetconfClient netconfClient = new TestingNetconfClient(clientId, netconfAddress, netconfClientDispatcher);
306 long sessionId = netconfClient.getSessionId();
307 logger.info("Client with sessionid {} hello exchanged", sessionId);
309 final NetconfMessage getMessage = XmlFileLoader
310 .xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
311 NetconfMessage result = netconfClient.sendRequest(getMessage).get();
312 logger.info("Client with sessionid {} got result {}", sessionId, result);
314 Preconditions.checkState(NetconfMessageUtil.isErrorMessage(result) == false,
315 "Received error response: " + XmlUtil.toString(result.getDocument()) +
316 " to request: " + XmlUtil.toString(getMessage.getDocument()));
318 netconfClient.close();
319 logger.info("Client with session id {} ended", sessionId);
320 thrownException = Optional.absent();
321 } catch (final Exception e) {
322 thrownException = Optional.of(e);