2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.netconf.impl;
11 import static com.google.common.base.Preconditions.checkNotNull;
12 import static org.junit.Assert.fail;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Mockito.doNothing;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.mock;
18 import java.io.DataOutputStream;
19 import java.io.InputStream;
20 import java.io.InputStreamReader;
21 import java.lang.management.ManagementFactory;
22 import java.net.InetSocketAddress;
23 import java.net.Socket;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.List;
29 import javax.management.ObjectName;
31 import org.apache.commons.io.IOUtils;
32 import org.junit.After;
33 import org.junit.Before;
34 import org.junit.Test;
35 import org.mockito.Mock;
36 import org.mockito.MockitoAnnotations;
37 import org.opendaylight.controller.config.util.ConfigRegistryJMXClient;
38 import org.opendaylight.controller.config.util.ConfigTransactionJMXClient;
39 import org.opendaylight.controller.config.yang.store.api.YangStoreService;
40 import org.opendaylight.controller.config.yang.store.api.YangStoreSnapshot;
41 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
42 import org.opendaylight.controller.netconf.api.NetconfMessage;
43 import org.opendaylight.controller.netconf.client.NetconfClient;
44 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
45 import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
46 import org.opendaylight.controller.netconf.impl.osgi.SessionMonitoringService;
47 import org.opendaylight.controller.netconf.mapping.api.Capability;
48 import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
49 import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
50 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
51 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
52 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
53 import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
54 import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
55 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58 import org.w3c.dom.Document;
60 import com.google.common.base.Optional;
61 import com.google.common.collect.Sets;
63 import io.netty.channel.ChannelFuture;
64 import io.netty.channel.EventLoopGroup;
65 import io.netty.channel.nio.NioEventLoopGroup;
66 import io.netty.util.HashedWheelTimer;
68 public class ConcurrentClientsTest {
70 private static final int CONCURRENCY = 16;
71 private EventLoopGroup nettyGroup;
72 private NetconfClientDispatcher netconfClientDispatcher;
75 private YangStoreService yangStoreService;
77 private ConfigRegistryJMXClient jmxClient;
79 private final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303);
81 static final Logger logger = LoggerFactory.getLogger(ConcurrentClientsTest.class);
83 private DefaultCommitNotificationProducer commitNot;
84 private NetconfServerDispatcher dispatch;
87 private SessionMonitoringService monitoring;
89 HashedWheelTimer hashedWheelTimer;
92 public void setUp() throws Exception {
94 MockitoAnnotations.initMocks(this);
95 final YangStoreSnapshot yStore = mock(YangStoreSnapshot.class);
96 doReturn(yStore).when(this.yangStoreService).getYangStoreSnapshot();
97 doReturn(Collections.emptyMap()).when(yStore).getModuleMXBeanEntryMap();
99 final ConfigTransactionJMXClient mockedTCl = mock(ConfigTransactionJMXClient.class);
100 doReturn(mockedTCl).when(this.jmxClient).getConfigTransactionClient(any(ObjectName.class));
102 doReturn(Collections.emptySet()).when(jmxClient).lookupConfigBeans();
105 nettyGroup = new NioEventLoopGroup();
106 NetconfHelloMessageAdditionalHeader additionalHeader = new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp", "client");
107 netconfClientDispatcher = new NetconfClientDispatcher( nettyGroup, nettyGroup, additionalHeader, 5000);
109 NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
110 factoriesListener.onAddNetconfOperationServiceFactory(mockOpF());
112 SessionIdProvider idProvider = new SessionIdProvider();
113 hashedWheelTimer = new HashedWheelTimer();
114 NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory(
115 hashedWheelTimer, factoriesListener, idProvider, 5000);
117 commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
119 doNothing().when(monitoring).onSessionUp(any(NetconfServerSession.class));
120 doNothing().when(monitoring).onSessionDown(any(NetconfServerSession.class));
122 NetconfServerSessionListenerFactory listenerFactory = new NetconfServerSessionListenerFactory(
123 factoriesListener, commitNot, idProvider, monitoring);
124 NetconfServerDispatcher.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcher.ServerChannelInitializer(serverNegotiatorFactory, listenerFactory);
125 dispatch = new NetconfServerDispatcher(serverChannelInitializer, nettyGroup, nettyGroup);
127 ChannelFuture s = dispatch.createServer(netconfAddress);
132 public void tearDown(){
133 hashedWheelTimer.stop();
134 nettyGroup.shutdownGracefully();
137 private NetconfOperationServiceFactory mockOpF() {
138 return new NetconfOperationServiceFactory() {
140 public NetconfOperationService createService(long netconfSessionId, String netconfSessionIdForReporting) {
141 return new NetconfOperationService() {
143 public Set<Capability> getCapabilities() {
144 return Collections.emptySet();
148 public Set<NetconfOperation> getNetconfOperations() {
149 return Sets.<NetconfOperation> newHashSet(new NetconfOperation() {
151 public HandlingPriority canHandle(Document message) {
152 return HandlingPriority.getHandlingPriority(Integer.MAX_VALUE);
156 public Document handle(Document requestMessage, NetconfOperationChainedExecution subsequentOperation) throws NetconfDocumentedException {
158 return XmlUtil.readXmlToDocument("<test/>");
159 } catch (Exception e) {
160 throw new RuntimeException(e);
167 public void close() {
175 public void cleanUp() throws Exception {
180 public void multipleClients() throws Exception {
181 List<TestingThread> threads = new ArrayList<>();
183 final int attempts = 5;
184 for (int i = 0; i < CONCURRENCY; i++) {
185 TestingThread thread = new TestingThread(String.valueOf(i), attempts);
190 for (TestingThread thread : threads) {
192 if(thread.thrownException.isPresent()) {
193 Exception exception = thread.thrownException.get();
194 logger.error("Thread for testing client failed", exception);
195 fail("Client thread " + thread + " failed: " + exception.getMessage());
201 public void synchronizationTest() throws Exception {
202 new BlockingThread("foo").run2();
206 public void multipleBlockingClients() throws Exception {
207 List<BlockingThread> threads = new ArrayList<>();
208 for (int i = 0; i < CONCURRENCY; i++) {
209 BlockingThread thread = new BlockingThread(String.valueOf(i));
214 for (BlockingThread thread : threads) {
216 if(thread.thrownException.isPresent()) {
217 Exception exception = thread.thrownException.get();
218 logger.error("Thread for testing client failed", exception);
219 fail("Client thread " + thread + " failed: " + exception.getMessage());
224 class BlockingThread extends Thread {
225 private Optional<Exception> thrownException;
227 public BlockingThread(String name) {
228 super("client-" + name);
235 thrownException = Optional.absent();
236 } catch (Exception e) {
237 thrownException = Optional.of(e);
241 private void run2() throws Exception {
242 InputStream clientHello = checkNotNull(XmlFileLoader
243 .getResourceAsStream("netconfMessages/client_hello.xml"));
244 InputStream getConfig = checkNotNull(XmlFileLoader.getResourceAsStream("netconfMessages/getConfig.xml"));
246 Socket clientSocket = new Socket(netconfAddress.getHostString(), netconfAddress.getPort());
247 DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
248 InputStreamReader inFromServer = new InputStreamReader(clientSocket.getInputStream());
250 StringBuffer sb = new StringBuffer();
251 while (sb.toString().endsWith("]]>]]>") == false) {
252 sb.append((char) inFromServer.read());
254 logger.info(sb.toString());
256 outToServer.write(IOUtils.toByteArray(clientHello));
257 outToServer.write("]]>]]>".getBytes());
259 // Thread.sleep(100);
260 outToServer.write(IOUtils.toByteArray(getConfig));
261 outToServer.write("]]>]]>".getBytes());
264 sb = new StringBuffer();
265 while (sb.toString().endsWith("]]>]]>") == false) {
266 sb.append((char) inFromServer.read());
268 logger.info(sb.toString());
269 clientSocket.close();
273 class TestingThread extends Thread {
275 private final String clientId;
276 private final int attempts;
277 private Optional<Exception> thrownException;
279 TestingThread(String clientId, int attempts) {
280 this.clientId = clientId;
281 this.attempts = attempts;
282 setName("client-" + clientId);
288 final NetconfClient netconfClient = new NetconfClient(clientId, netconfAddress, netconfClientDispatcher);
289 long sessionId = netconfClient.getSessionId();
290 logger.info("Client with sessionid {} hello exchanged", sessionId);
292 final NetconfMessage getMessage = XmlFileLoader
293 .xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
294 NetconfMessage result = netconfClient.sendMessage(getMessage);
295 logger.info("Client with sessionid {} got result {}", sessionId, result);
296 netconfClient.close();
297 logger.info("Client with session id {} ended", sessionId);
298 thrownException = Optional.absent();
299 } catch (final Exception e) {
300 thrownException = Optional.of(e);