2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.netconf.impl;
11 import com.google.common.base.Optional;
12 import com.google.common.collect.Sets;
13 import io.netty.channel.ChannelFuture;
14 import io.netty.channel.EventLoopGroup;
15 import io.netty.channel.nio.NioEventLoopGroup;
16 import io.netty.util.HashedWheelTimer;
17 import java.io.DataOutputStream;
18 import java.io.InputStream;
19 import java.io.InputStreamReader;
20 import java.lang.management.ManagementFactory;
21 import java.net.InetSocketAddress;
22 import java.net.Socket;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.List;
27 import java.util.concurrent.TimeUnit;
28 import javax.management.ObjectName;
29 import org.apache.commons.io.IOUtils;
30 import org.junit.After;
31 import org.junit.AfterClass;
32 import org.junit.Before;
33 import org.junit.Test;
34 import org.mockito.Mock;
35 import org.mockito.MockitoAnnotations;
36 import org.opendaylight.controller.config.util.ConfigRegistryJMXClient;
37 import org.opendaylight.controller.config.util.ConfigTransactionJMXClient;
38 import org.opendaylight.controller.config.yang.store.api.YangStoreService;
39 import org.opendaylight.controller.config.yang.store.api.YangStoreSnapshot;
40 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
41 import org.opendaylight.controller.netconf.api.NetconfMessage;
42 import org.opendaylight.controller.netconf.api.NetconfOperationRouter;
43 import org.opendaylight.controller.netconf.client.NetconfClient;
44 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
45 import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
46 import org.opendaylight.controller.netconf.mapping.api.Capability;
47 import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
48 import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
49 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationFilter;
50 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
51 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
52 import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
53 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56 import org.w3c.dom.Document;
57 import static com.google.common.base.Preconditions.checkNotNull;
58 import static org.junit.Assert.fail;
59 import static org.mockito.Matchers.any;
60 import static org.mockito.Mockito.doReturn;
61 import static org.mockito.Mockito.mock;
63 public class ConcurrentClientsTest {
65 private static final int CONCURRENCY = 16;
66 private static EventLoopGroup nettyGroup = new NioEventLoopGroup();
67 public static final NetconfClientDispatcher NETCONF_CLIENT_DISPATCHER = new NetconfClientDispatcher( nettyGroup, nettyGroup);
70 private YangStoreService yangStoreService;
72 private ConfigRegistryJMXClient jmxClient;
74 private final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303);
76 static final Logger logger = LoggerFactory.getLogger(ConcurrentClientsTest.class);
78 private DefaultCommitNotificationProducer commitNot;
79 private NetconfServerDispatcher dispatch;
83 public void setUp() throws Exception {
85 MockitoAnnotations.initMocks(this);
86 final YangStoreSnapshot yStore = mock(YangStoreSnapshot.class);
87 doReturn(yStore).when(this.yangStoreService).getYangStoreSnapshot();
88 doReturn(Collections.emptyMap()).when(yStore).getModuleMXBeanEntryMap();
89 doReturn(Collections.emptyMap()).when(yStore).getModuleMap();
91 final ConfigTransactionJMXClient mockedTCl = mock(ConfigTransactionJMXClient.class);
92 doReturn(mockedTCl).when(this.jmxClient).getConfigTransactionClient(any(ObjectName.class));
94 doReturn(Collections.emptySet()).when(jmxClient).lookupConfigBeans();
97 NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
98 factoriesListener.onAddNetconfOperationServiceFactory(mockOpF());
100 SessionIdProvider idProvider = new SessionIdProvider();
101 NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory(
102 new HashedWheelTimer(5000, TimeUnit.MILLISECONDS), factoriesListener, idProvider);
104 commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
106 NetconfServerSessionListenerFactory listenerFactory = new NetconfServerSessionListenerFactory(
107 factoriesListener, commitNot, idProvider);
108 NetconfServerDispatcher.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcher.ServerChannelInitializer(serverNegotiatorFactory, listenerFactory);
109 dispatch = new NetconfServerDispatcher(serverChannelInitializer, nettyGroup, nettyGroup);
111 ChannelFuture s = dispatch.createServer(netconfAddress);
116 public static void tearDownStatic() {
117 nettyGroup.shutdownGracefully();
120 private NetconfOperationServiceFactory mockOpF() {
121 return new NetconfOperationServiceFactory() {
123 public NetconfOperationService createService(long netconfSessionId, String netconfSessionIdForReporting) {
124 return new NetconfOperationService() {
126 public Set<Capability> getCapabilities() {
127 return Collections.emptySet();
131 public Set<NetconfOperation> getNetconfOperations() {
132 return Sets.<NetconfOperation> newHashSet(new NetconfOperation() {
134 public HandlingPriority canHandle(Document message) {
135 return HandlingPriority.getHandlingPriority(Integer.MAX_VALUE);
139 public Document handle(Document message, NetconfOperationRouter operationRouter)
140 throws NetconfDocumentedException {
142 return XmlUtil.readXmlToDocument("<test/>");
143 } catch (Exception e) {
144 throw new RuntimeException(e);
151 public Set<NetconfOperationFilter> getFilters() {
152 return Collections.emptySet();
156 public void close() {
164 public void cleanUp() throws Exception {
169 public void multipleClients() throws Exception {
170 List<TestingThread> threads = new ArrayList<>();
172 final int attempts = 5;
173 for (int i = 0; i < CONCURRENCY; i++) {
174 TestingThread thread = new TestingThread(String.valueOf(i), attempts);
179 for (TestingThread thread : threads) {
181 if(thread.thrownException.isPresent()) {
182 Exception exception = thread.thrownException.get();
183 logger.error("Thread for testing client failed", exception);
184 fail("Client thread " + thread + " failed: " + exception.getMessage());
190 public void synchronizationTest() throws Exception {
191 new BlockingThread("foo").run2();
195 public void multipleBlockingClients() throws Exception {
196 List<BlockingThread> threads = new ArrayList<>();
197 for (int i = 0; i < CONCURRENCY; i++) {
198 BlockingThread thread = new BlockingThread(String.valueOf(i));
203 for (BlockingThread thread : threads) {
205 if(thread.thrownException.isPresent()) {
206 Exception exception = thread.thrownException.get();
207 logger.error("Thread for testing client failed", exception);
208 fail("Client thread " + thread + " failed: " + exception.getMessage());
213 class BlockingThread extends Thread {
214 private Optional<Exception> thrownException;
216 public BlockingThread(String name) {
217 super("client-" + name);
224 thrownException = Optional.absent();
225 } catch (Exception e) {
226 thrownException = Optional.of(e);
230 private void run2() throws Exception {
231 InputStream clientHello = checkNotNull(XmlFileLoader
232 .getResourceAsStream("netconfMessages/client_hello.xml"));
233 InputStream getConfig = checkNotNull(XmlFileLoader.getResourceAsStream("netconfMessages/getConfig.xml"));
235 Socket clientSocket = new Socket(netconfAddress.getHostString(), netconfAddress.getPort());
236 DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
237 InputStreamReader inFromServer = new InputStreamReader(clientSocket.getInputStream());
239 StringBuffer sb = new StringBuffer();
240 while (sb.toString().endsWith("]]>]]>") == false) {
241 sb.append((char) inFromServer.read());
243 logger.info(sb.toString());
245 outToServer.write(IOUtils.toByteArray(clientHello));
246 outToServer.write("]]>]]>".getBytes());
248 // Thread.sleep(100);
249 outToServer.write(IOUtils.toByteArray(getConfig));
250 outToServer.write("]]>]]>".getBytes());
253 sb = new StringBuffer();
254 while (sb.toString().endsWith("]]>]]>") == false) {
255 sb.append((char) inFromServer.read());
257 logger.info(sb.toString());
258 clientSocket.close();
262 class TestingThread extends Thread {
264 private final String clientId;
265 private final int attempts;
266 private Optional<Exception> thrownException;
268 TestingThread(String clientId, int attempts) {
269 this.clientId = clientId;
270 this.attempts = attempts;
271 setName("client-" + clientId);
277 final NetconfClient netconfClient = new NetconfClient(clientId, netconfAddress, NETCONF_CLIENT_DISPATCHER);
278 long sessionId = netconfClient.getSessionId();
279 logger.info("Client with sessionid {} hello exchanged", sessionId);
281 final NetconfMessage getMessage = XmlFileLoader
282 .xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
283 NetconfMessage result = netconfClient.sendMessage(getMessage);
284 logger.info("Client with sessionid {} got result {}", sessionId, result);
285 netconfClient.close();
286 logger.info("Client with session id {} ended", sessionId);
287 thrownException = Optional.absent();
288 } catch (final Exception e) {
289 thrownException = Optional.of(e);