2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.netconf.impl;
11 import com.google.common.base.Optional;
12 import com.google.common.collect.Sets;
13 import io.netty.channel.ChannelFuture;
14 import io.netty.util.HashedWheelTimer;
15 import org.apache.commons.io.IOUtils;
16 import org.junit.After;
17 import org.junit.Before;
18 import org.junit.Test;
19 import org.mockito.Mock;
20 import org.mockito.MockitoAnnotations;
21 import org.opendaylight.controller.config.util.ConfigRegistryJMXClient;
22 import org.opendaylight.controller.config.util.ConfigTransactionJMXClient;
23 import org.opendaylight.controller.config.yang.store.api.YangStoreService;
24 import org.opendaylight.controller.config.yang.store.api.YangStoreSnapshot;
25 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
26 import org.opendaylight.controller.netconf.api.NetconfMessage;
27 import org.opendaylight.controller.netconf.api.NetconfOperationRouter;
28 import org.opendaylight.controller.netconf.client.NetconfClient;
29 import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
30 import org.opendaylight.controller.netconf.mapping.api.Capability;
31 import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
32 import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
33 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationFilter;
34 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
35 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
36 import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
37 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40 import org.w3c.dom.Document;
42 import javax.management.ObjectName;
43 import javax.net.ssl.SSLContext;
44 import java.io.DataOutputStream;
45 import java.io.InputStream;
46 import java.io.InputStreamReader;
47 import java.lang.management.ManagementFactory;
48 import java.net.InetSocketAddress;
49 import java.net.Socket;
50 import java.util.ArrayList;
51 import java.util.Collections;
52 import java.util.List;
54 import java.util.concurrent.TimeUnit;
56 import static com.google.common.base.Preconditions.checkNotNull;
57 import static org.junit.Assert.assertTrue;
58 import static org.mockito.Matchers.any;
59 import static org.mockito.Mockito.doReturn;
60 import static org.mockito.Mockito.mock;
62 public class ConcurrentClientsTest {
64 private static final int CONCURRENCY = 16;
66 private YangStoreService yangStoreService;
68 private ConfigRegistryJMXClient jmxClient;
70 private final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303);
72 static final Logger logger = LoggerFactory.getLogger(ConcurrentClientsTest.class);
74 private DefaultCommitNotificationProducer commitNot;
75 private NetconfServerDispatcher dispatch;
78 public void setUp() throws Exception {
80 MockitoAnnotations.initMocks(this);
81 final YangStoreSnapshot yStore = mock(YangStoreSnapshot.class);
82 doReturn(yStore).when(this.yangStoreService).getYangStoreSnapshot();
83 doReturn(Collections.emptyMap()).when(yStore).getModuleMXBeanEntryMap();
84 doReturn(Collections.emptyMap()).when(yStore).getModuleMap();
86 final ConfigTransactionJMXClient mockedTCl = mock(ConfigTransactionJMXClient.class);
87 doReturn(mockedTCl).when(this.jmxClient).getConfigTransactionClient(any(ObjectName.class));
89 doReturn(Collections.emptySet()).when(jmxClient).lookupConfigBeans();
92 NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
93 factoriesListener.onAddNetconfOperationServiceFactory(mockOpF());
95 SessionIdProvider idProvider = new SessionIdProvider();
96 NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory(
97 new HashedWheelTimer(5000, TimeUnit.MILLISECONDS), factoriesListener, idProvider);
99 commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
101 NetconfServerSessionListenerFactory listenerFactory = new NetconfServerSessionListenerFactory(
102 factoriesListener, commitNot, idProvider);
103 dispatch = new NetconfServerDispatcher(Optional.<SSLContext> absent(), serverNegotiatorFactory, listenerFactory);
105 ChannelFuture s = dispatch.createServer(netconfAddress);
109 private NetconfOperationServiceFactory mockOpF() {
110 return new NetconfOperationServiceFactory() {
112 public NetconfOperationService createService(long netconfSessionId, String netconfSessionIdForReporting) {
113 return new NetconfOperationService() {
115 public Set<Capability> getCapabilities() {
116 return Collections.emptySet();
120 public Set<NetconfOperation> getNetconfOperations() {
121 return Sets.<NetconfOperation> newHashSet(new NetconfOperation() {
123 public HandlingPriority canHandle(Document message) {
124 return HandlingPriority.getHandlingPriority(Integer.MAX_VALUE);
128 public Document handle(Document message, NetconfOperationRouter operationRouter)
129 throws NetconfDocumentedException {
131 return XmlUtil.readXmlToDocument("<test/>");
132 } catch (Exception e) {
133 throw new RuntimeException(e);
140 public Set<NetconfOperationFilter> getFilters() {
141 return Collections.emptySet();
145 public void close() {
153 public void cleanUp() throws Exception {
159 public void multipleClients() throws Exception {
160 List<TestingThread> threads = new ArrayList<>();
162 final int attempts = 5;
163 for (int i = 0; i < CONCURRENCY; i++) {
164 TestingThread thread = new TestingThread(String.valueOf(i), attempts);
169 for (TestingThread thread : threads) {
171 assertTrue(thread.success);
176 public void synchronizationTest() throws Exception {
177 new BlockingThread("foo").run2();
181 public void multipleBlockingClients() throws Exception {
182 List<BlockingThread> threads = new ArrayList<>();
183 for (int i = 0; i < CONCURRENCY; i++) {
184 BlockingThread thread = new BlockingThread(String.valueOf(i));
189 for (BlockingThread thread : threads) {
191 assertTrue(thread.success);
195 class BlockingThread extends Thread {
198 public BlockingThread(String name) {
199 super("client-" + name);
207 } catch (Exception e) {
209 throw new RuntimeException(e);
213 private void run2() throws Exception {
214 InputStream clientHello = checkNotNull(XmlFileLoader
215 .getResourceAsStream("netconfMessages/client_hello.xml"));
216 InputStream getConfig = checkNotNull(XmlFileLoader.getResourceAsStream("netconfMessages/getConfig.xml"));
218 Socket clientSocket = new Socket(netconfAddress.getHostString(), netconfAddress.getPort());
219 DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
220 InputStreamReader inFromServer = new InputStreamReader(clientSocket.getInputStream());
222 StringBuffer sb = new StringBuffer();
223 while (sb.toString().endsWith("]]>]]>") == false) {
224 sb.append((char) inFromServer.read());
226 logger.info(sb.toString());
228 outToServer.write(IOUtils.toByteArray(clientHello));
229 outToServer.write("]]>]]>".getBytes());
231 // Thread.sleep(100);
232 outToServer.write(IOUtils.toByteArray(getConfig));
233 outToServer.write("]]>]]>".getBytes());
236 sb = new StringBuffer();
237 while (sb.toString().endsWith("]]>]]>") == false) {
238 sb.append((char) inFromServer.read());
240 logger.info(sb.toString());
241 clientSocket.close();
245 class TestingThread extends Thread {
247 private final String clientId;
248 private final int attempts;
249 private Boolean success;
251 TestingThread(String clientId, int attempts) {
252 this.clientId = clientId;
253 this.attempts = attempts;
254 setName("client-" + clientId);
260 final NetconfClient netconfClient = new NetconfClient(clientId, netconfAddress);
261 long sessionId = netconfClient.getSessionId();
262 logger.info("Client with sessionid {} hello exchanged", sessionId);
264 final NetconfMessage getMessage = XmlFileLoader
265 .xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
266 NetconfMessage result = netconfClient.sendMessage(getMessage);
267 logger.info("Client with sessionid {} got result {}", sessionId, result);
268 netconfClient.close();
269 logger.info("Client with session id {} ended", sessionId);
271 } catch (final Exception e) {
273 throw new RuntimeException(e);