Improve iteration over subnets map by using entrySet instead of keyset.
[controller.git] / opendaylight / netconf / netconf-impl / src / test / java / org / opendaylight / controller / netconf / impl / ConcurrentClientsTest.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.impl;
10
11 import com.google.common.base.Optional;
12 import com.google.common.collect.Sets;
13 import io.netty.channel.ChannelFuture;
14 import io.netty.channel.EventLoopGroup;
15 import io.netty.channel.nio.NioEventLoopGroup;
16 import io.netty.util.HashedWheelTimer;
17 import org.apache.commons.io.IOUtils;
18 import org.junit.After;
19 import org.junit.AfterClass;
20 import org.junit.Before;
21 import org.junit.Test;
22 import org.mockito.Mock;
23 import org.mockito.MockitoAnnotations;
24 import org.opendaylight.controller.config.util.ConfigRegistryJMXClient;
25 import org.opendaylight.controller.config.util.ConfigTransactionJMXClient;
26 import org.opendaylight.controller.config.yang.store.api.YangStoreService;
27 import org.opendaylight.controller.config.yang.store.api.YangStoreSnapshot;
28 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
29 import org.opendaylight.controller.netconf.api.NetconfMessage;
30 import org.opendaylight.controller.netconf.api.NetconfOperationRouter;
31 import org.opendaylight.controller.netconf.client.NetconfClient;
32 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
33 import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
34 import org.opendaylight.controller.netconf.impl.osgi.SessionMonitoringService;
35 import org.opendaylight.controller.netconf.mapping.api.Capability;
36 import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
37 import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
38 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationFilter;
39 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
40 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
41 import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
42 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45 import org.w3c.dom.Document;
46
47 import javax.management.ObjectName;
48 import java.io.DataOutputStream;
49 import java.io.InputStream;
50 import java.io.InputStreamReader;
51 import java.lang.management.ManagementFactory;
52 import java.net.InetSocketAddress;
53 import java.net.Socket;
54 import java.util.ArrayList;
55 import java.util.Collections;
56 import java.util.List;
57 import java.util.Set;
58 import java.util.concurrent.TimeUnit;
59
60 import static com.google.common.base.Preconditions.checkNotNull;
61 import static org.junit.Assert.fail;
62 import static org.mockito.Matchers.any;
63 import static org.mockito.Mockito.doNothing;
64 import static org.mockito.Mockito.doReturn;
65 import static org.mockito.Mockito.mock;
66
67 public class ConcurrentClientsTest {
68
69     private static final int CONCURRENCY = 16;
70     private static EventLoopGroup nettyGroup = new NioEventLoopGroup();
71     public static final NetconfClientDispatcher NETCONF_CLIENT_DISPATCHER = new NetconfClientDispatcher( nettyGroup, nettyGroup);
72
73     @Mock
74     private YangStoreService yangStoreService;
75     @Mock
76     private ConfigRegistryJMXClient jmxClient;
77
78     private final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303);
79
80     static final Logger logger = LoggerFactory.getLogger(ConcurrentClientsTest.class);
81
82     private DefaultCommitNotificationProducer commitNot;
83     private NetconfServerDispatcher dispatch;
84
85     @Mock
86     private SessionMonitoringService monitoring;
87
88     @Before
89     public void setUp() throws Exception {
90         { // init mocks
91             MockitoAnnotations.initMocks(this);
92             final YangStoreSnapshot yStore = mock(YangStoreSnapshot.class);
93             doReturn(yStore).when(this.yangStoreService).getYangStoreSnapshot();
94             doReturn(Collections.emptyMap()).when(yStore).getModuleMXBeanEntryMap();
95             doReturn(Collections.emptyMap()).when(yStore).getModuleMap();
96
97             final ConfigTransactionJMXClient mockedTCl = mock(ConfigTransactionJMXClient.class);
98             doReturn(mockedTCl).when(this.jmxClient).getConfigTransactionClient(any(ObjectName.class));
99
100             doReturn(Collections.emptySet()).when(jmxClient).lookupConfigBeans();
101         }
102
103         NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
104         factoriesListener.onAddNetconfOperationServiceFactory(mockOpF());
105
106         SessionIdProvider idProvider = new SessionIdProvider();
107         NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory(
108                 new HashedWheelTimer(5000, TimeUnit.MILLISECONDS), factoriesListener, idProvider);
109
110         commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
111
112         doNothing().when(monitoring).onSessionUp(any(NetconfServerSession.class));
113         doNothing().when(monitoring).onSessionDown(any(NetconfServerSession.class));
114
115         NetconfServerSessionListenerFactory listenerFactory = new NetconfServerSessionListenerFactory(
116                 factoriesListener, commitNot, idProvider, monitoring);
117         NetconfServerDispatcher.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcher.ServerChannelInitializer(serverNegotiatorFactory, listenerFactory);
118         dispatch = new NetconfServerDispatcher(serverChannelInitializer, nettyGroup, nettyGroup);
119
120         ChannelFuture s = dispatch.createServer(netconfAddress);
121         s.await();
122     }
123
124     @AfterClass
125     public static void tearDownStatic() {
126         nettyGroup.shutdownGracefully();
127     }
128
129     private NetconfOperationServiceFactory mockOpF() {
130         return new NetconfOperationServiceFactory() {
131             @Override
132             public NetconfOperationService createService(long netconfSessionId, String netconfSessionIdForReporting) {
133                 return new NetconfOperationService() {
134                     @Override
135                     public Set<Capability> getCapabilities() {
136                         return Collections.emptySet();
137                     }
138
139                     @Override
140                     public Set<NetconfOperation> getNetconfOperations() {
141                         return Sets.<NetconfOperation> newHashSet(new NetconfOperation() {
142                             @Override
143                             public HandlingPriority canHandle(Document message) {
144                                 return HandlingPriority.getHandlingPriority(Integer.MAX_VALUE);
145                             }
146
147                             @Override
148                             public Document handle(Document message, NetconfOperationRouter operationRouter)
149                                     throws NetconfDocumentedException {
150                                 try {
151                                     return XmlUtil.readXmlToDocument("<test/>");
152                                 } catch (Exception e) {
153                                     throw new RuntimeException(e);
154                                 }
155                             }
156                         });
157                     }
158
159                     @Override
160                     public Set<NetconfOperationFilter> getFilters() {
161                         return Collections.emptySet();
162                     }
163
164                     @Override
165                     public void close() {
166                     }
167                 };
168             }
169         };
170     }
171
172     @After
173     public void cleanUp() throws Exception {
174         commitNot.close();
175     }
176
177     @Test
178     public void multipleClients() throws Exception {
179         List<TestingThread> threads = new ArrayList<>();
180
181         final int attempts = 5;
182         for (int i = 0; i < CONCURRENCY; i++) {
183             TestingThread thread = new TestingThread(String.valueOf(i), attempts);
184             threads.add(thread);
185             thread.start();
186         }
187
188         for (TestingThread thread : threads) {
189             thread.join();
190             if(thread.thrownException.isPresent()) {
191                 Exception exception = thread.thrownException.get();
192                 logger.error("Thread for testing client failed", exception);
193                 fail("Client thread " + thread + " failed: " + exception.getMessage());
194             }
195         }
196     }
197
198     @Test
199     public void synchronizationTest() throws Exception {
200         new BlockingThread("foo").run2();
201     }
202
203     @Test
204     public void multipleBlockingClients() throws Exception {
205         List<BlockingThread> threads = new ArrayList<>();
206         for (int i = 0; i < CONCURRENCY; i++) {
207             BlockingThread thread = new BlockingThread(String.valueOf(i));
208             threads.add(thread);
209             thread.start();
210         }
211
212         for (BlockingThread thread : threads) {
213             thread.join();
214             if(thread.thrownException.isPresent()) {
215                 Exception exception = thread.thrownException.get();
216                 logger.error("Thread for testing client failed", exception);
217                 fail("Client thread " + thread + " failed: " + exception.getMessage());
218             }
219         }
220     }
221
222     class BlockingThread extends Thread {
223         private Optional<Exception> thrownException;
224
225         public BlockingThread(String name) {
226             super("client-" + name);
227         }
228
229         @Override
230         public void run() {
231             try {
232                 run2();
233                 thrownException = Optional.absent();
234             } catch (Exception e) {
235                 thrownException = Optional.of(e);
236             }
237         }
238
239         private void run2() throws Exception {
240             InputStream clientHello = checkNotNull(XmlFileLoader
241                     .getResourceAsStream("netconfMessages/client_hello.xml"));
242             InputStream getConfig = checkNotNull(XmlFileLoader.getResourceAsStream("netconfMessages/getConfig.xml"));
243
244             Socket clientSocket = new Socket(netconfAddress.getHostString(), netconfAddress.getPort());
245             DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
246             InputStreamReader inFromServer = new InputStreamReader(clientSocket.getInputStream());
247
248             StringBuffer sb = new StringBuffer();
249             while (sb.toString().endsWith("]]>]]>") == false) {
250                 sb.append((char) inFromServer.read());
251             }
252             logger.info(sb.toString());
253
254             outToServer.write(IOUtils.toByteArray(clientHello));
255             outToServer.write("]]>]]>".getBytes());
256             outToServer.flush();
257             // Thread.sleep(100);
258             outToServer.write(IOUtils.toByteArray(getConfig));
259             outToServer.write("]]>]]>".getBytes());
260             outToServer.flush();
261             Thread.sleep(100);
262             sb = new StringBuffer();
263             while (sb.toString().endsWith("]]>]]>") == false) {
264                 sb.append((char) inFromServer.read());
265             }
266             logger.info(sb.toString());
267             clientSocket.close();
268         }
269     }
270
271     class TestingThread extends Thread {
272
273         private final String clientId;
274         private final int attempts;
275         private Optional<Exception> thrownException;
276
277         TestingThread(String clientId, int attempts) {
278             this.clientId = clientId;
279             this.attempts = attempts;
280             setName("client-" + clientId);
281         }
282
283         @Override
284         public void run() {
285             try {
286                 final NetconfClient netconfClient = new NetconfClient(clientId, netconfAddress, NETCONF_CLIENT_DISPATCHER);
287                 long sessionId = netconfClient.getSessionId();
288                 logger.info("Client with sessionid {} hello exchanged", sessionId);
289
290                 final NetconfMessage getMessage = XmlFileLoader
291                         .xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
292                 NetconfMessage result = netconfClient.sendMessage(getMessage);
293                 logger.info("Client with sessionid {} got result {}", sessionId, result);
294                 netconfClient.close();
295                 logger.info("Client with session id {} ended", sessionId);
296                 thrownException = Optional.absent();
297             } catch (final Exception e) {
298                 thrownException = Optional.of(e);
299             }
300         }
301     }
302 }