Merge "Add Enqueue validation check in FlowConfig"
[controller.git] / opendaylight / netconf / netconf-impl / src / test / java / org / opendaylight / controller / netconf / impl / ConcurrentClientsTest.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.impl;
10
11 import com.google.common.base.Optional;
12 import com.google.common.collect.Sets;
13 import io.netty.channel.ChannelFuture;
14 import io.netty.channel.EventLoopGroup;
15 import io.netty.channel.nio.NioEventLoopGroup;
16 import io.netty.util.HashedWheelTimer;
17 import org.apache.commons.io.IOUtils;
18 import org.junit.After;
19 import org.junit.Before;
20 import org.junit.Test;
21 import org.mockito.Mock;
22 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
23 import org.opendaylight.controller.netconf.api.NetconfMessage;
24 import org.opendaylight.controller.netconf.client.NetconfClient;
25 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
26 import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
27 import org.opendaylight.controller.netconf.impl.osgi.SessionMonitoringService;
28 import org.opendaylight.controller.netconf.mapping.api.Capability;
29 import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
30 import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
31 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
32 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
33 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
34 import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
35 import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
36 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39 import org.w3c.dom.Document;
40
41 import java.io.DataOutputStream;
42 import java.io.InputStream;
43 import java.io.InputStreamReader;
44 import java.lang.management.ManagementFactory;
45 import java.net.InetSocketAddress;
46 import java.net.Socket;
47 import java.util.ArrayList;
48 import java.util.Collections;
49 import java.util.List;
50 import java.util.Set;
51
52 import static com.google.common.base.Preconditions.checkNotNull;
53 import static org.junit.Assert.fail;
54 import static org.mockito.Matchers.any;
55 import static org.mockito.Mockito.doNothing;
56 import static org.mockito.MockitoAnnotations.initMocks;
57
58 public class ConcurrentClientsTest {
59
60     private static final int CONCURRENCY = 16;
61     private EventLoopGroup nettyGroup;
62     private NetconfClientDispatcher netconfClientDispatcher;
63
64     private final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303);
65
66     static final Logger logger = LoggerFactory.getLogger(ConcurrentClientsTest.class);
67
68     private DefaultCommitNotificationProducer commitNot;
69     private NetconfServerDispatcher dispatch;
70
71     @Mock
72     private SessionMonitoringService monitoring;
73
74     HashedWheelTimer hashedWheelTimer;
75
76     @Before
77     public void setUp() throws Exception {
78         initMocks(this);
79         nettyGroup = new NioEventLoopGroup();
80         NetconfHelloMessageAdditionalHeader additionalHeader = new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp", "client");
81         netconfClientDispatcher = new NetconfClientDispatcher( nettyGroup, nettyGroup, additionalHeader, 5000);
82
83         NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
84         factoriesListener.onAddNetconfOperationServiceFactory(mockOpF());
85
86         SessionIdProvider idProvider = new SessionIdProvider();
87         hashedWheelTimer = new HashedWheelTimer();
88         NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory(
89                 hashedWheelTimer, factoriesListener, idProvider, 5000);
90
91         commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
92
93         doNothing().when(monitoring).onSessionUp(any(NetconfServerSession.class));
94         doNothing().when(monitoring).onSessionDown(any(NetconfServerSession.class));
95
96         NetconfServerSessionListenerFactory listenerFactory = new NetconfServerSessionListenerFactory(
97                 factoriesListener, commitNot, idProvider, monitoring);
98         NetconfServerDispatcher.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcher.ServerChannelInitializer(serverNegotiatorFactory, listenerFactory);
99         dispatch = new NetconfServerDispatcher(serverChannelInitializer, nettyGroup, nettyGroup);
100
101         ChannelFuture s = dispatch.createServer(netconfAddress);
102         s.await();
103     }
104
105     @After
106     public void tearDown(){
107         hashedWheelTimer.stop();
108         nettyGroup.shutdownGracefully();
109     }
110
111     private NetconfOperationServiceFactory mockOpF() {
112         return new NetconfOperationServiceFactory() {
113             @Override
114             public NetconfOperationService createService(long netconfSessionId, String netconfSessionIdForReporting) {
115                 return new NetconfOperationService() {
116                     @Override
117                     public Set<Capability> getCapabilities() {
118                         return Collections.emptySet();
119                     }
120
121                     @Override
122                     public Set<NetconfOperation> getNetconfOperations() {
123                         return Sets.<NetconfOperation> newHashSet(new NetconfOperation() {
124                             @Override
125                             public HandlingPriority canHandle(Document message) {
126                                 return HandlingPriority.getHandlingPriority(Integer.MAX_VALUE);
127                             }
128
129                             @Override
130                             public Document handle(Document requestMessage, NetconfOperationChainedExecution subsequentOperation) throws NetconfDocumentedException {
131                                 try {
132                                     return XmlUtil.readXmlToDocument("<test/>");
133                                 } catch (Exception e) {
134                                     throw new RuntimeException(e);
135                                 }
136                             }
137                         });
138                     }
139
140                     @Override
141                     public void close() {
142                     }
143                 };
144             }
145         };
146     }
147
148     @After
149     public void cleanUp() throws Exception {
150         commitNot.close();
151     }
152
153     @Test
154     public void multipleClients() throws Exception {
155         List<TestingThread> threads = new ArrayList<>();
156
157         final int attempts = 5;
158         for (int i = 0; i < CONCURRENCY; i++) {
159             TestingThread thread = new TestingThread(String.valueOf(i), attempts);
160             threads.add(thread);
161             thread.start();
162         }
163
164         for (TestingThread thread : threads) {
165             thread.join();
166             if(thread.thrownException.isPresent()) {
167                 Exception exception = thread.thrownException.get();
168                 logger.error("Thread for testing client failed", exception);
169                 fail("Client thread " + thread + " failed: " + exception.getMessage());
170             }
171         }
172     }
173
174     @Test
175     public void synchronizationTest() throws Exception {
176         new BlockingThread("foo").run2();
177     }
178
179     @Test
180     public void multipleBlockingClients() throws Exception {
181         List<BlockingThread> threads = new ArrayList<>();
182         for (int i = 0; i < CONCURRENCY; i++) {
183             BlockingThread thread = new BlockingThread(String.valueOf(i));
184             threads.add(thread);
185             thread.start();
186         }
187
188         for (BlockingThread thread : threads) {
189             thread.join();
190             if(thread.thrownException.isPresent()) {
191                 Exception exception = thread.thrownException.get();
192                 logger.error("Thread for testing client failed", exception);
193                 fail("Client thread " + thread + " failed: " + exception.getMessage());
194             }
195         }
196     }
197
198     class BlockingThread extends Thread {
199         private Optional<Exception> thrownException;
200
201         public BlockingThread(String name) {
202             super("client-" + name);
203         }
204
205         @Override
206         public void run() {
207             try {
208                 run2();
209                 thrownException = Optional.absent();
210             } catch (Exception e) {
211                 thrownException = Optional.of(e);
212             }
213         }
214
215         private void run2() throws Exception {
216             InputStream clientHello = checkNotNull(XmlFileLoader
217                     .getResourceAsStream("netconfMessages/client_hello.xml"));
218             InputStream getConfig = checkNotNull(XmlFileLoader.getResourceAsStream("netconfMessages/getConfig.xml"));
219
220             Socket clientSocket = new Socket(netconfAddress.getHostString(), netconfAddress.getPort());
221             DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
222             InputStreamReader inFromServer = new InputStreamReader(clientSocket.getInputStream());
223
224             StringBuffer sb = new StringBuffer();
225             while (sb.toString().endsWith("]]>]]>") == false) {
226                 sb.append((char) inFromServer.read());
227             }
228             logger.info(sb.toString());
229
230             outToServer.write(IOUtils.toByteArray(clientHello));
231             outToServer.write("]]>]]>".getBytes());
232             outToServer.flush();
233             // Thread.sleep(100);
234             outToServer.write(IOUtils.toByteArray(getConfig));
235             outToServer.write("]]>]]>".getBytes());
236             outToServer.flush();
237             Thread.sleep(100);
238             sb = new StringBuffer();
239             while (sb.toString().endsWith("]]>]]>") == false) {
240                 sb.append((char) inFromServer.read());
241             }
242             logger.info(sb.toString());
243             clientSocket.close();
244         }
245     }
246
247     class TestingThread extends Thread {
248
249         private final String clientId;
250         private final int attempts;
251         private Optional<Exception> thrownException;
252
253         TestingThread(String clientId, int attempts) {
254             this.clientId = clientId;
255             this.attempts = attempts;
256             setName("client-" + clientId);
257         }
258
259         @Override
260         public void run() {
261             try {
262                 final NetconfClient netconfClient = new NetconfClient(clientId, netconfAddress, netconfClientDispatcher);
263                 long sessionId = netconfClient.getSessionId();
264                 logger.info("Client with sessionid {} hello exchanged", sessionId);
265
266                 final NetconfMessage getMessage = XmlFileLoader
267                         .xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
268                 NetconfMessage result = netconfClient.sendMessage(getMessage);
269                 logger.info("Client with sessionid {} got result {}", sessionId, result);
270                 netconfClient.close();
271                 logger.info("Client with session id {} ended", sessionId);
272                 thrownException = Optional.absent();
273             } catch (final Exception e) {
274                 thrownException = Optional.of(e);
275             }
276         }
277     }
278 }