2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.netconf.impl;
11 import com.google.common.base.Optional;
12 import com.google.common.collect.Sets;
13 import io.netty.channel.ChannelFuture;
14 import io.netty.channel.EventLoopGroup;
15 import io.netty.channel.nio.NioEventLoopGroup;
16 import io.netty.util.HashedWheelTimer;
17 import org.apache.commons.io.IOUtils;
18 import org.junit.After;
19 import org.junit.Before;
20 import org.junit.Test;
21 import org.mockito.Mock;
22 import org.mockito.MockitoAnnotations;
23 import org.opendaylight.controller.config.util.ConfigRegistryJMXClient;
24 import org.opendaylight.controller.config.util.ConfigTransactionJMXClient;
25 import org.opendaylight.controller.config.yang.store.api.YangStoreService;
26 import org.opendaylight.controller.config.yang.store.api.YangStoreSnapshot;
27 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
28 import org.opendaylight.controller.netconf.api.NetconfMessage;
29 import org.opendaylight.controller.netconf.api.NetconfOperationRouter;
30 import org.opendaylight.controller.netconf.client.NetconfClient;
31 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
32 import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
33 import org.opendaylight.controller.netconf.impl.osgi.SessionMonitoringService;
34 import org.opendaylight.controller.netconf.mapping.api.Capability;
35 import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
36 import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
37 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationFilter;
38 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
39 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
40 import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
41 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44 import org.w3c.dom.Document;
46 import javax.management.ObjectName;
47 import java.io.DataOutputStream;
48 import java.io.InputStream;
49 import java.io.InputStreamReader;
50 import java.lang.management.ManagementFactory;
51 import java.net.InetSocketAddress;
52 import java.net.Socket;
53 import java.util.ArrayList;
54 import java.util.Collections;
55 import java.util.List;
58 import static com.google.common.base.Preconditions.checkNotNull;
59 import static org.junit.Assert.fail;
60 import static org.mockito.Matchers.any;
61 import static org.mockito.Mockito.doNothing;
62 import static org.mockito.Mockito.doReturn;
63 import static org.mockito.Mockito.mock;
65 public class ConcurrentClientsTest {
67 private static final int CONCURRENCY = 16;
68 private EventLoopGroup nettyGroup;
69 private NetconfClientDispatcher netconfClientDispatcher;
72 private YangStoreService yangStoreService;
74 private ConfigRegistryJMXClient jmxClient;
76 private final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303);
78 static final Logger logger = LoggerFactory.getLogger(ConcurrentClientsTest.class);
80 private DefaultCommitNotificationProducer commitNot;
81 private NetconfServerDispatcher dispatch;
84 private SessionMonitoringService monitoring;
86 HashedWheelTimer hashedWheelTimer;
89 public void setUp() throws Exception {
91 MockitoAnnotations.initMocks(this);
92 final YangStoreSnapshot yStore = mock(YangStoreSnapshot.class);
93 doReturn(yStore).when(this.yangStoreService).getYangStoreSnapshot();
94 doReturn(Collections.emptyMap()).when(yStore).getModuleMXBeanEntryMap();
96 final ConfigTransactionJMXClient mockedTCl = mock(ConfigTransactionJMXClient.class);
97 doReturn(mockedTCl).when(this.jmxClient).getConfigTransactionClient(any(ObjectName.class));
99 doReturn(Collections.emptySet()).when(jmxClient).lookupConfigBeans();
102 nettyGroup = new NioEventLoopGroup();
103 netconfClientDispatcher = new NetconfClientDispatcher( nettyGroup, nettyGroup, 5000);
105 NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
106 factoriesListener.onAddNetconfOperationServiceFactory(mockOpF());
108 SessionIdProvider idProvider = new SessionIdProvider();
109 hashedWheelTimer = new HashedWheelTimer();
110 NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory(
111 hashedWheelTimer, factoriesListener, idProvider, 5000);
113 commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
115 doNothing().when(monitoring).onSessionUp(any(NetconfServerSession.class));
116 doNothing().when(monitoring).onSessionDown(any(NetconfServerSession.class));
118 NetconfServerSessionListenerFactory listenerFactory = new NetconfServerSessionListenerFactory(
119 factoriesListener, commitNot, idProvider, monitoring);
120 NetconfServerDispatcher.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcher.ServerChannelInitializer(serverNegotiatorFactory, listenerFactory);
121 dispatch = new NetconfServerDispatcher(serverChannelInitializer, nettyGroup, nettyGroup);
123 ChannelFuture s = dispatch.createServer(netconfAddress);
128 public void tearDown(){
129 hashedWheelTimer.stop();
130 nettyGroup.shutdownGracefully();
133 private NetconfOperationServiceFactory mockOpF() {
134 return new NetconfOperationServiceFactory() {
136 public NetconfOperationService createService(long netconfSessionId, String netconfSessionIdForReporting) {
137 return new NetconfOperationService() {
139 public Set<Capability> getCapabilities() {
140 return Collections.emptySet();
144 public Set<NetconfOperation> getNetconfOperations() {
145 return Sets.<NetconfOperation> newHashSet(new NetconfOperation() {
147 public HandlingPriority canHandle(Document message) {
148 return HandlingPriority.getHandlingPriority(Integer.MAX_VALUE);
152 public Document handle(Document message, NetconfOperationRouter operationRouter)
153 throws NetconfDocumentedException {
155 return XmlUtil.readXmlToDocument("<test/>");
156 } catch (Exception e) {
157 throw new RuntimeException(e);
164 public Set<NetconfOperationFilter> getFilters() {
165 return Collections.emptySet();
169 public void close() {
177 public void cleanUp() throws Exception {
182 public void multipleClients() throws Exception {
183 List<TestingThread> threads = new ArrayList<>();
185 final int attempts = 5;
186 for (int i = 0; i < CONCURRENCY; i++) {
187 TestingThread thread = new TestingThread(String.valueOf(i), attempts);
192 for (TestingThread thread : threads) {
194 if(thread.thrownException.isPresent()) {
195 Exception exception = thread.thrownException.get();
196 logger.error("Thread for testing client failed", exception);
197 fail("Client thread " + thread + " failed: " + exception.getMessage());
203 public void synchronizationTest() throws Exception {
204 new BlockingThread("foo").run2();
208 public void multipleBlockingClients() throws Exception {
209 List<BlockingThread> threads = new ArrayList<>();
210 for (int i = 0; i < CONCURRENCY; i++) {
211 BlockingThread thread = new BlockingThread(String.valueOf(i));
216 for (BlockingThread thread : threads) {
218 if(thread.thrownException.isPresent()) {
219 Exception exception = thread.thrownException.get();
220 logger.error("Thread for testing client failed", exception);
221 fail("Client thread " + thread + " failed: " + exception.getMessage());
226 class BlockingThread extends Thread {
227 private Optional<Exception> thrownException;
229 public BlockingThread(String name) {
230 super("client-" + name);
237 thrownException = Optional.absent();
238 } catch (Exception e) {
239 thrownException = Optional.of(e);
243 private void run2() throws Exception {
244 InputStream clientHello = checkNotNull(XmlFileLoader
245 .getResourceAsStream("netconfMessages/client_hello.xml"));
246 InputStream getConfig = checkNotNull(XmlFileLoader.getResourceAsStream("netconfMessages/getConfig.xml"));
248 Socket clientSocket = new Socket(netconfAddress.getHostString(), netconfAddress.getPort());
249 DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
250 InputStreamReader inFromServer = new InputStreamReader(clientSocket.getInputStream());
252 StringBuffer sb = new StringBuffer();
253 while (sb.toString().endsWith("]]>]]>") == false) {
254 sb.append((char) inFromServer.read());
256 logger.info(sb.toString());
258 outToServer.write(IOUtils.toByteArray(clientHello));
259 outToServer.write("]]>]]>".getBytes());
261 // Thread.sleep(100);
262 outToServer.write(IOUtils.toByteArray(getConfig));
263 outToServer.write("]]>]]>".getBytes());
266 sb = new StringBuffer();
267 while (sb.toString().endsWith("]]>]]>") == false) {
268 sb.append((char) inFromServer.read());
270 logger.info(sb.toString());
271 clientSocket.close();
275 class TestingThread extends Thread {
277 private final String clientId;
278 private final int attempts;
279 private Optional<Exception> thrownException;
281 TestingThread(String clientId, int attempts) {
282 this.clientId = clientId;
283 this.attempts = attempts;
284 setName("client-" + clientId);
290 final NetconfClient netconfClient = new NetconfClient(clientId, netconfAddress, netconfClientDispatcher);
291 long sessionId = netconfClient.getSessionId();
292 logger.info("Client with sessionid {} hello exchanged", sessionId);
294 final NetconfMessage getMessage = XmlFileLoader
295 .xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
296 NetconfMessage result = netconfClient.sendMessage(getMessage);
297 logger.info("Client with sessionid {} got result {}", sessionId, result);
298 netconfClient.close();
299 logger.info("Client with session id {} ended", sessionId);
300 thrownException = Optional.absent();
301 } catch (final Exception e) {
302 thrownException = Optional.of(e);