Merge "BUG-732 Fix sal-netconf-connector unable to download schemas"
[controller.git] / opendaylight / netconf / netconf-impl / src / test / java / org / opendaylight / controller / netconf / impl / ConcurrentClientsTest.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.impl;
10
11 import static com.google.common.base.Preconditions.checkNotNull;
12 import static org.junit.Assert.fail;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Mockito.doNothing;
15 import static org.mockito.Mockito.mock;
16
17 import java.io.DataOutputStream;
18 import java.io.InputStream;
19 import java.io.InputStreamReader;
20 import java.lang.management.ManagementFactory;
21 import java.net.InetSocketAddress;
22 import java.net.Socket;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.List;
26 import java.util.Set;
27
28 import org.apache.commons.io.IOUtils;
29 import org.junit.After;
30 import org.junit.Before;
31 import org.junit.Test;
32 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
33 import org.opendaylight.controller.netconf.api.NetconfMessage;
34 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
35 import org.opendaylight.controller.netconf.client.test.TestingNetconfClient;
36 import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
37 import org.opendaylight.controller.netconf.impl.osgi.SessionMonitoringService;
38 import org.opendaylight.controller.netconf.mapping.api.Capability;
39 import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
40 import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
41 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
42 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
43 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
44 import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
45 import org.opendaylight.controller.netconf.util.messages.NetconfStartExiMessage;
46 import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
47 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50 import org.w3c.dom.Document;
51
52 import com.google.common.base.Optional;
53 import com.google.common.collect.Sets;
54
55 import io.netty.channel.ChannelFuture;
56 import io.netty.channel.EventLoopGroup;
57 import io.netty.channel.nio.NioEventLoopGroup;
58 import io.netty.util.HashedWheelTimer;
59
60 public class ConcurrentClientsTest {
61
62     private static final int CONCURRENCY = 16;
63     private EventLoopGroup nettyGroup;
64     private NetconfClientDispatcher netconfClientDispatcher;
65
66     private final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303);
67
68     static final Logger logger = LoggerFactory.getLogger(ConcurrentClientsTest.class);
69
70     private DefaultCommitNotificationProducer commitNot;
71     private NetconfServerDispatcher dispatch;
72
73
74
75     HashedWheelTimer hashedWheelTimer;
76
77     public static SessionMonitoringService createMockedMonitoringService() {
78         SessionMonitoringService monitoring = mock(SessionMonitoringService.class);
79         doNothing().when(monitoring).onSessionUp(any(NetconfServerSession.class));
80         doNothing().when(monitoring).onSessionDown(any(NetconfServerSession.class));
81         return monitoring;
82     }
83
84     @Before
85     public void setUp() throws Exception {
86
87         nettyGroup = new NioEventLoopGroup();
88         NetconfHelloMessageAdditionalHeader additionalHeader = new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp", "client");
89         netconfClientDispatcher = new NetconfClientDispatcher( nettyGroup, nettyGroup, additionalHeader, 5000);
90
91         NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
92         factoriesListener.onAddNetconfOperationServiceFactory(mockOpF());
93
94         SessionIdProvider idProvider = new SessionIdProvider();
95         hashedWheelTimer = new HashedWheelTimer();
96
97         NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory(
98                 hashedWheelTimer, factoriesListener, idProvider, 5000, commitNot, createMockedMonitoringService());
99
100         commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
101
102         NetconfServerDispatcher.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcher.ServerChannelInitializer(serverNegotiatorFactory);
103         dispatch = new NetconfServerDispatcher(serverChannelInitializer, nettyGroup, nettyGroup);
104
105         ChannelFuture s = dispatch.createServer(netconfAddress);
106         s.await();
107     }
108
109     @After
110     public void tearDown(){
111         hashedWheelTimer.stop();
112         nettyGroup.shutdownGracefully();
113     }
114
115     private NetconfOperationServiceFactory mockOpF() {
116         return new NetconfOperationServiceFactory() {
117             @Override
118             public NetconfOperationService createService(String netconfSessionIdForReporting) {
119                 return new NetconfOperationService() {
120                     @Override
121                     public Set<Capability> getCapabilities() {
122                         return Collections.emptySet();
123                     }
124
125                     @Override
126                     public Set<NetconfOperation> getNetconfOperations() {
127                         return Sets.<NetconfOperation> newHashSet(new NetconfOperation() {
128                             @Override
129                             public HandlingPriority canHandle(Document message) {
130                                 return XmlUtil.toString(message).contains(NetconfStartExiMessage.START_EXI) ?
131                                         HandlingPriority.CANNOT_HANDLE :
132                                         HandlingPriority.HANDLE_WITH_MAX_PRIORITY;
133                             }
134
135                             @Override
136                             public Document handle(Document requestMessage, NetconfOperationChainedExecution subsequentOperation) throws NetconfDocumentedException {
137                                 try {
138                                     return XmlUtil.readXmlToDocument("<test/>");
139                                 } catch (Exception e) {
140                                     throw new RuntimeException(e);
141                                 }
142                             }
143                         });
144                     }
145
146                     @Override
147                     public void close() {
148                     }
149                 };
150             }
151         };
152     }
153
154     @After
155     public void cleanUp() throws Exception {
156         commitNot.close();
157     }
158
159     @Test(timeout = 30 * 1000)
160     public void multipleClients() throws Exception {
161         List<TestingThread> threads = new ArrayList<>();
162
163         final int attempts = 5;
164         for (int i = 0; i < CONCURRENCY; i++) {
165             TestingThread thread = new TestingThread(String.valueOf(i), attempts);
166             threads.add(thread);
167             thread.start();
168         }
169
170         for (TestingThread thread : threads) {
171             thread.join();
172             if(thread.thrownException.isPresent()) {
173                 Exception exception = thread.thrownException.get();
174                 logger.error("Thread for testing client failed", exception);
175                 fail("Client thread " + thread + " failed: " + exception.getMessage());
176             }
177         }
178     }
179
180     @Test(timeout = 30 * 1000)
181     public void synchronizationTest() throws Exception {
182         new BlockingThread("foo").run2();
183     }
184
185     @Test(timeout = 30 * 1000)
186     public void multipleBlockingClients() throws Exception {
187         List<BlockingThread> threads = new ArrayList<>();
188         for (int i = 0; i < CONCURRENCY; i++) {
189             BlockingThread thread = new BlockingThread(String.valueOf(i));
190             threads.add(thread);
191             thread.start();
192         }
193
194         for (BlockingThread thread : threads) {
195             thread.join();
196             if(thread.thrownException.isPresent()) {
197                 Exception exception = thread.thrownException.get();
198                 logger.error("Thread for testing client failed", exception);
199                 fail("Client thread " + thread + " failed: " + exception.getMessage());
200             }
201         }
202     }
203
204     class BlockingThread extends Thread {
205         private Optional<Exception> thrownException;
206
207         public BlockingThread(String name) {
208             super("client-" + name);
209         }
210
211         @Override
212         public void run() {
213             try {
214                 run2();
215                 thrownException = Optional.absent();
216             } catch (Exception e) {
217                 thrownException = Optional.of(e);
218             }
219         }
220
221         private void run2() throws Exception {
222             InputStream clientHello = checkNotNull(XmlFileLoader
223                     .getResourceAsStream("netconfMessages/client_hello.xml"));
224             InputStream getConfig = checkNotNull(XmlFileLoader.getResourceAsStream("netconfMessages/getConfig.xml"));
225
226             Socket clientSocket = new Socket(netconfAddress.getHostString(), netconfAddress.getPort());
227             DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
228             InputStreamReader inFromServer = new InputStreamReader(clientSocket.getInputStream());
229
230             StringBuffer sb = new StringBuffer();
231             while (sb.toString().endsWith("]]>]]>") == false) {
232                 sb.append((char) inFromServer.read());
233             }
234             logger.info(sb.toString());
235
236             outToServer.write(IOUtils.toByteArray(clientHello));
237             outToServer.write("]]>]]>".getBytes());
238             outToServer.flush();
239             // Thread.sleep(100);
240             outToServer.write(IOUtils.toByteArray(getConfig));
241             outToServer.write("]]>]]>".getBytes());
242             outToServer.flush();
243             Thread.sleep(100);
244             sb = new StringBuffer();
245             while (sb.toString().endsWith("]]>]]>") == false) {
246                 sb.append((char) inFromServer.read());
247             }
248             logger.info(sb.toString());
249             clientSocket.close();
250         }
251     }
252
253     class TestingThread extends Thread {
254
255         private final String clientId;
256         private final int attempts;
257         private Optional<Exception> thrownException;
258
259         TestingThread(String clientId, int attempts) {
260             this.clientId = clientId;
261             this.attempts = attempts;
262             setName("client-" + clientId);
263         }
264
265         @Override
266         public void run() {
267             try {
268                 final TestingNetconfClient netconfClient = new TestingNetconfClient(clientId, netconfAddress, netconfClientDispatcher);
269                 long sessionId = netconfClient.getSessionId();
270                 logger.info("Client with sessionid {} hello exchanged", sessionId);
271
272                 final NetconfMessage getMessage = XmlFileLoader
273                         .xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
274                 NetconfMessage result = netconfClient.sendRequest(getMessage).get();
275                 logger.info("Client with sessionid {} got result {}", sessionId, result);
276                 netconfClient.close();
277                 logger.info("Client with session id {} ended", sessionId);
278                 thrownException = Optional.absent();
279             } catch (final Exception e) {
280                 thrownException = Optional.of(e);
281             }
282         }
283     }
284 }