Merge "Fixed bug when Binding-Aware Data Change Listeners we're not triggered."
[controller.git] / opendaylight / netconf / netconf-impl / src / test / java / org / opendaylight / controller / netconf / impl / ConcurrentClientsTest.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.impl;
10
11 import com.google.common.base.Optional;
12 import com.google.common.collect.Sets;
13 import io.netty.channel.ChannelFuture;
14 import io.netty.channel.EventLoopGroup;
15 import io.netty.channel.nio.NioEventLoopGroup;
16 import io.netty.util.HashedWheelTimer;
17 import org.apache.commons.io.IOUtils;
18 import org.junit.After;
19 import org.junit.Before;
20 import org.junit.Test;
21 import org.mockito.Mock;
22 import org.mockito.MockitoAnnotations;
23 import org.opendaylight.controller.config.util.ConfigRegistryJMXClient;
24 import org.opendaylight.controller.config.util.ConfigTransactionJMXClient;
25 import org.opendaylight.controller.config.yang.store.api.YangStoreService;
26 import org.opendaylight.controller.config.yang.store.api.YangStoreSnapshot;
27 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
28 import org.opendaylight.controller.netconf.api.NetconfMessage;
29 import org.opendaylight.controller.netconf.api.NetconfOperationRouter;
30 import org.opendaylight.controller.netconf.client.NetconfClient;
31 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
32 import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
33 import org.opendaylight.controller.netconf.impl.osgi.SessionMonitoringService;
34 import org.opendaylight.controller.netconf.mapping.api.Capability;
35 import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
36 import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
37 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationFilter;
38 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
39 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
40 import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
41 import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
42 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45 import org.w3c.dom.Document;
46
47 import javax.management.ObjectName;
48 import java.io.DataOutputStream;
49 import java.io.InputStream;
50 import java.io.InputStreamReader;
51 import java.lang.management.ManagementFactory;
52 import java.net.InetSocketAddress;
53 import java.net.Socket;
54 import java.util.ArrayList;
55 import java.util.Collections;
56 import java.util.List;
57 import java.util.Set;
58
59 import static com.google.common.base.Preconditions.checkNotNull;
60 import static org.junit.Assert.fail;
61 import static org.mockito.Matchers.any;
62 import static org.mockito.Mockito.doNothing;
63 import static org.mockito.Mockito.doReturn;
64 import static org.mockito.Mockito.mock;
65
66 public class ConcurrentClientsTest {
67
68     private static final int CONCURRENCY = 16;
69     private EventLoopGroup nettyGroup;
70     private NetconfClientDispatcher netconfClientDispatcher;
71
72     @Mock
73     private YangStoreService yangStoreService;
74     @Mock
75     private ConfigRegistryJMXClient jmxClient;
76
77     private final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303);
78
79     static final Logger logger = LoggerFactory.getLogger(ConcurrentClientsTest.class);
80
81     private DefaultCommitNotificationProducer commitNot;
82     private NetconfServerDispatcher dispatch;
83
84     @Mock
85     private SessionMonitoringService monitoring;
86
87     HashedWheelTimer hashedWheelTimer;
88
89     @Before
90     public void setUp() throws Exception {
91         { // init mocks
92             MockitoAnnotations.initMocks(this);
93             final YangStoreSnapshot yStore = mock(YangStoreSnapshot.class);
94             doReturn(yStore).when(this.yangStoreService).getYangStoreSnapshot();
95             doReturn(Collections.emptyMap()).when(yStore).getModuleMXBeanEntryMap();
96
97             final ConfigTransactionJMXClient mockedTCl = mock(ConfigTransactionJMXClient.class);
98             doReturn(mockedTCl).when(this.jmxClient).getConfigTransactionClient(any(ObjectName.class));
99
100             doReturn(Collections.emptySet()).when(jmxClient).lookupConfigBeans();
101         }
102
103         nettyGroup = new NioEventLoopGroup();
104         NetconfHelloMessageAdditionalHeader additionalHeader = new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp", "client");
105         netconfClientDispatcher = new NetconfClientDispatcher( nettyGroup, nettyGroup, additionalHeader, 5000);
106
107         NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
108         factoriesListener.onAddNetconfOperationServiceFactory(mockOpF());
109
110         SessionIdProvider idProvider = new SessionIdProvider();
111         hashedWheelTimer = new HashedWheelTimer();
112         NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory(
113                 hashedWheelTimer, factoriesListener, idProvider, 5000);
114
115         commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
116
117         doNothing().when(monitoring).onSessionUp(any(NetconfServerSession.class));
118         doNothing().when(monitoring).onSessionDown(any(NetconfServerSession.class));
119
120         NetconfServerSessionListenerFactory listenerFactory = new NetconfServerSessionListenerFactory(
121                 factoriesListener, commitNot, idProvider, monitoring);
122         NetconfServerDispatcher.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcher.ServerChannelInitializer(serverNegotiatorFactory, listenerFactory);
123         dispatch = new NetconfServerDispatcher(serverChannelInitializer, nettyGroup, nettyGroup);
124
125         ChannelFuture s = dispatch.createServer(netconfAddress);
126         s.await();
127     }
128
129     @After
130     public void tearDown(){
131         hashedWheelTimer.stop();
132         nettyGroup.shutdownGracefully();
133     }
134
135     private NetconfOperationServiceFactory mockOpF() {
136         return new NetconfOperationServiceFactory() {
137             @Override
138             public NetconfOperationService createService(long netconfSessionId, String netconfSessionIdForReporting) {
139                 return new NetconfOperationService() {
140                     @Override
141                     public Set<Capability> getCapabilities() {
142                         return Collections.emptySet();
143                     }
144
145                     @Override
146                     public Set<NetconfOperation> getNetconfOperations() {
147                         return Sets.<NetconfOperation> newHashSet(new NetconfOperation() {
148                             @Override
149                             public HandlingPriority canHandle(Document message) {
150                                 return HandlingPriority.getHandlingPriority(Integer.MAX_VALUE);
151                             }
152
153                             @Override
154                             public Document handle(Document message, NetconfOperationRouter operationRouter)
155                                     throws NetconfDocumentedException {
156                                 try {
157                                     return XmlUtil.readXmlToDocument("<test/>");
158                                 } catch (Exception e) {
159                                     throw new RuntimeException(e);
160                                 }
161                             }
162                         });
163                     }
164
165                     @Override
166                     public Set<NetconfOperationFilter> getFilters() {
167                         return Collections.emptySet();
168                     }
169
170                     @Override
171                     public void close() {
172                     }
173                 };
174             }
175         };
176     }
177
178     @After
179     public void cleanUp() throws Exception {
180         commitNot.close();
181     }
182
183     @Test
184     public void multipleClients() throws Exception {
185         List<TestingThread> threads = new ArrayList<>();
186
187         final int attempts = 5;
188         for (int i = 0; i < CONCURRENCY; i++) {
189             TestingThread thread = new TestingThread(String.valueOf(i), attempts);
190             threads.add(thread);
191             thread.start();
192         }
193
194         for (TestingThread thread : threads) {
195             thread.join();
196             if(thread.thrownException.isPresent()) {
197                 Exception exception = thread.thrownException.get();
198                 logger.error("Thread for testing client failed", exception);
199                 fail("Client thread " + thread + " failed: " + exception.getMessage());
200             }
201         }
202     }
203
204     @Test
205     public void synchronizationTest() throws Exception {
206         new BlockingThread("foo").run2();
207     }
208
209     @Test
210     public void multipleBlockingClients() throws Exception {
211         List<BlockingThread> threads = new ArrayList<>();
212         for (int i = 0; i < CONCURRENCY; i++) {
213             BlockingThread thread = new BlockingThread(String.valueOf(i));
214             threads.add(thread);
215             thread.start();
216         }
217
218         for (BlockingThread thread : threads) {
219             thread.join();
220             if(thread.thrownException.isPresent()) {
221                 Exception exception = thread.thrownException.get();
222                 logger.error("Thread for testing client failed", exception);
223                 fail("Client thread " + thread + " failed: " + exception.getMessage());
224             }
225         }
226     }
227
228     class BlockingThread extends Thread {
229         private Optional<Exception> thrownException;
230
231         public BlockingThread(String name) {
232             super("client-" + name);
233         }
234
235         @Override
236         public void run() {
237             try {
238                 run2();
239                 thrownException = Optional.absent();
240             } catch (Exception e) {
241                 thrownException = Optional.of(e);
242             }
243         }
244
245         private void run2() throws Exception {
246             InputStream clientHello = checkNotNull(XmlFileLoader
247                     .getResourceAsStream("netconfMessages/client_hello.xml"));
248             InputStream getConfig = checkNotNull(XmlFileLoader.getResourceAsStream("netconfMessages/getConfig.xml"));
249
250             Socket clientSocket = new Socket(netconfAddress.getHostString(), netconfAddress.getPort());
251             DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
252             InputStreamReader inFromServer = new InputStreamReader(clientSocket.getInputStream());
253
254             StringBuffer sb = new StringBuffer();
255             while (sb.toString().endsWith("]]>]]>") == false) {
256                 sb.append((char) inFromServer.read());
257             }
258             logger.info(sb.toString());
259
260             outToServer.write(IOUtils.toByteArray(clientHello));
261             outToServer.write("]]>]]>".getBytes());
262             outToServer.flush();
263             // Thread.sleep(100);
264             outToServer.write(IOUtils.toByteArray(getConfig));
265             outToServer.write("]]>]]>".getBytes());
266             outToServer.flush();
267             Thread.sleep(100);
268             sb = new StringBuffer();
269             while (sb.toString().endsWith("]]>]]>") == false) {
270                 sb.append((char) inFromServer.read());
271             }
272             logger.info(sb.toString());
273             clientSocket.close();
274         }
275     }
276
277     class TestingThread extends Thread {
278
279         private final String clientId;
280         private final int attempts;
281         private Optional<Exception> thrownException;
282
283         TestingThread(String clientId, int attempts) {
284             this.clientId = clientId;
285             this.attempts = attempts;
286             setName("client-" + clientId);
287         }
288
289         @Override
290         public void run() {
291             try {
292                 final NetconfClient netconfClient = new NetconfClient(clientId, netconfAddress, netconfClientDispatcher);
293                 long sessionId = netconfClient.getSessionId();
294                 logger.info("Client with sessionid {} hello exchanged", sessionId);
295
296                 final NetconfMessage getMessage = XmlFileLoader
297                         .xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
298                 NetconfMessage result = netconfClient.sendMessage(getMessage);
299                 logger.info("Client with sessionid {} got result {}", sessionId, result);
300                 netconfClient.close();
301                 logger.info("Client with session id {} ended", sessionId);
302                 thrownException = Optional.absent();
303             } catch (final Exception e) {
304                 thrownException = Optional.of(e);
305             }
306         }
307     }
308 }