Merge "Improve iteration over subnets map by using entrySet instead of keyset. It...
[controller.git] / opendaylight / netconf / netconf-impl / src / test / java / org / opendaylight / controller / netconf / impl / ConcurrentClientsTest.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.impl;
10
11 import static com.google.common.base.Preconditions.checkNotNull;
12 import static org.junit.Assert.fail;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Mockito.doNothing;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.mock;
17 import io.netty.channel.ChannelFuture;
18 import io.netty.channel.EventLoopGroup;
19 import io.netty.channel.nio.NioEventLoopGroup;
20 import io.netty.util.HashedWheelTimer;
21
22 import java.io.DataOutputStream;
23 import java.io.InputStream;
24 import java.io.InputStreamReader;
25 import java.lang.management.ManagementFactory;
26 import java.net.InetSocketAddress;
27 import java.net.Socket;
28 import java.util.ArrayList;
29 import java.util.Collections;
30 import java.util.List;
31 import java.util.Set;
32 import java.util.concurrent.TimeUnit;
33
34 import javax.management.ObjectName;
35
36 import org.apache.commons.io.IOUtils;
37 import org.junit.After;
38 import org.junit.AfterClass;
39 import org.junit.Before;
40 import org.junit.Test;
41 import org.mockito.Mock;
42 import org.mockito.MockitoAnnotations;
43 import org.opendaylight.controller.config.util.ConfigRegistryJMXClient;
44 import org.opendaylight.controller.config.util.ConfigTransactionJMXClient;
45 import org.opendaylight.controller.config.yang.store.api.YangStoreService;
46 import org.opendaylight.controller.config.yang.store.api.YangStoreSnapshot;
47 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
48 import org.opendaylight.controller.netconf.api.NetconfMessage;
49 import org.opendaylight.controller.netconf.api.NetconfOperationRouter;
50 import org.opendaylight.controller.netconf.client.NetconfClient;
51 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
52 import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
53 import org.opendaylight.controller.netconf.impl.osgi.SessionMonitoringService;
54 import org.opendaylight.controller.netconf.mapping.api.Capability;
55 import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
56 import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
57 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationFilter;
58 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
59 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
60 import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
61 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64 import org.w3c.dom.Document;
65
66 import com.google.common.base.Optional;
67 import com.google.common.collect.Sets;
68
69 public class ConcurrentClientsTest {
70
71     private static final int CONCURRENCY = 16;
72     private static EventLoopGroup nettyGroup = new NioEventLoopGroup();
73     public static final NetconfClientDispatcher NETCONF_CLIENT_DISPATCHER =
74             new NetconfClientDispatcher( nettyGroup, nettyGroup);
75
76     @Mock
77     private YangStoreService yangStoreService;
78     @Mock
79     private ConfigRegistryJMXClient jmxClient;
80
81     private final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303);
82
83     static final Logger logger = LoggerFactory.getLogger(ConcurrentClientsTest.class);
84
85     private DefaultCommitNotificationProducer commitNot;
86     private NetconfServerDispatcher dispatch;
87
88     @Mock
89     private SessionMonitoringService monitoring;
90
91     @Before
92     public void setUp() throws Exception {
93         { // init mocks
94             MockitoAnnotations.initMocks(this);
95             final YangStoreSnapshot yStore = mock(YangStoreSnapshot.class);
96             doReturn(yStore).when(this.yangStoreService).getYangStoreSnapshot();
97             doReturn(Collections.emptyMap()).when(yStore).getModuleMXBeanEntryMap();
98             doReturn(Collections.emptyMap()).when(yStore).getModuleMap();
99
100             final ConfigTransactionJMXClient mockedTCl = mock(ConfigTransactionJMXClient.class);
101             doReturn(mockedTCl).when(this.jmxClient).getConfigTransactionClient(any(ObjectName.class));
102
103             doReturn(Collections.emptySet()).when(jmxClient).lookupConfigBeans();
104         }
105
106         NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
107         factoriesListener.onAddNetconfOperationServiceFactory(mockOpF());
108
109         SessionIdProvider idProvider = new SessionIdProvider();
110         NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory(
111                 new HashedWheelTimer(5000, TimeUnit.MILLISECONDS), factoriesListener, idProvider);
112
113         commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
114
115         doNothing().when(monitoring).onSessionUp(any(NetconfServerSession.class));
116         doNothing().when(monitoring).onSessionDown(any(NetconfServerSession.class));
117
118         NetconfServerSessionListenerFactory listenerFactory = new NetconfServerSessionListenerFactory(
119                 factoriesListener, commitNot, idProvider, monitoring);
120         NetconfServerDispatcher.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcher.ServerChannelInitializer(serverNegotiatorFactory, listenerFactory);
121         dispatch = new NetconfServerDispatcher(serverChannelInitializer, nettyGroup, nettyGroup);
122
123         ChannelFuture s = dispatch.createServer(netconfAddress);
124         s.await();
125     }
126
127     @AfterClass
128     public static void tearDownStatic() {
129         nettyGroup.shutdownGracefully();
130     }
131
132     private NetconfOperationServiceFactory mockOpF() {
133         return new NetconfOperationServiceFactory() {
134             @Override
135             public NetconfOperationService createService(long netconfSessionId, String netconfSessionIdForReporting) {
136                 return new NetconfOperationService() {
137                     @Override
138                     public Set<Capability> getCapabilities() {
139                         return Collections.emptySet();
140                     }
141
142                     @Override
143                     public Set<NetconfOperation> getNetconfOperations() {
144                         return Sets.<NetconfOperation> newHashSet(new NetconfOperation() {
145                             @Override
146                             public HandlingPriority canHandle(Document message) {
147                                 return HandlingPriority.getHandlingPriority(Integer.MAX_VALUE);
148                             }
149
150                             @Override
151                             public Document handle(Document message, NetconfOperationRouter operationRouter)
152                                     throws NetconfDocumentedException {
153                                 try {
154                                     return XmlUtil.readXmlToDocument("<test/>");
155                                 } catch (Exception e) {
156                                     throw new RuntimeException(e);
157                                 }
158                             }
159                         });
160                     }
161
162                     @Override
163                     public Set<NetconfOperationFilter> getFilters() {
164                         return Collections.emptySet();
165                     }
166
167                     @Override
168                     public void close() {
169                     }
170                 };
171             }
172         };
173     }
174
175     @After
176     public void cleanUp() throws Exception {
177         commitNot.close();
178     }
179
180     @Test
181     public void multipleClients() throws Exception {
182         List<TestingThread> threads = new ArrayList<>();
183
184         final int attempts = 5;
185         for (int i = 0; i < CONCURRENCY; i++) {
186             TestingThread thread = new TestingThread(String.valueOf(i), attempts);
187             threads.add(thread);
188             thread.start();
189         }
190
191         for (TestingThread thread : threads) {
192             thread.join();
193             if(thread.thrownException.isPresent()) {
194                 Exception exception = thread.thrownException.get();
195                 logger.error("Thread for testing client failed", exception);
196                 fail("Client thread " + thread + " failed: " + exception.getMessage());
197             }
198         }
199     }
200
201     @Test
202     public void synchronizationTest() throws Exception {
203         new BlockingThread("foo").run2();
204     }
205
206     @Test
207     public void multipleBlockingClients() throws Exception {
208         List<BlockingThread> threads = new ArrayList<>();
209         for (int i = 0; i < CONCURRENCY; i++) {
210             BlockingThread thread = new BlockingThread(String.valueOf(i));
211             threads.add(thread);
212             thread.start();
213         }
214
215         for (BlockingThread thread : threads) {
216             thread.join();
217             if(thread.thrownException.isPresent()) {
218                 Exception exception = thread.thrownException.get();
219                 logger.error("Thread for testing client failed", exception);
220                 fail("Client thread " + thread + " failed: " + exception.getMessage());
221             }
222         }
223     }
224
225     class BlockingThread extends Thread {
226         private Optional<Exception> thrownException;
227
228         public BlockingThread(String name) {
229             super("client-" + name);
230         }
231
232         @Override
233         public void run() {
234             try {
235                 run2();
236                 thrownException = Optional.absent();
237             } catch (Exception e) {
238                 thrownException = Optional.of(e);
239             }
240         }
241
242         private void run2() throws Exception {
243             InputStream clientHello = checkNotNull(XmlFileLoader
244                     .getResourceAsStream("netconfMessages/client_hello.xml"));
245             InputStream getConfig = checkNotNull(XmlFileLoader.getResourceAsStream("netconfMessages/getConfig.xml"));
246
247             Socket clientSocket = new Socket(netconfAddress.getHostString(), netconfAddress.getPort());
248             DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
249             InputStreamReader inFromServer = new InputStreamReader(clientSocket.getInputStream());
250
251             StringBuffer sb = new StringBuffer();
252             while (sb.toString().endsWith("]]>]]>") == false) {
253                 sb.append((char) inFromServer.read());
254             }
255             logger.info(sb.toString());
256
257             outToServer.write(IOUtils.toByteArray(clientHello));
258             outToServer.write("]]>]]>".getBytes());
259             outToServer.flush();
260             // Thread.sleep(100);
261             outToServer.write(IOUtils.toByteArray(getConfig));
262             outToServer.write("]]>]]>".getBytes());
263             outToServer.flush();
264             Thread.sleep(100);
265             sb = new StringBuffer();
266             while (sb.toString().endsWith("]]>]]>") == false) {
267                 sb.append((char) inFromServer.read());
268             }
269             logger.info(sb.toString());
270             clientSocket.close();
271         }
272     }
273
274     class TestingThread extends Thread {
275
276         private final String clientId;
277         private final int attempts;
278         private Optional<Exception> thrownException;
279
280         TestingThread(String clientId, int attempts) {
281             this.clientId = clientId;
282             this.attempts = attempts;
283             setName("client-" + clientId);
284         }
285
286         @Override
287         public void run() {
288             try {
289                 final NetconfClient netconfClient = new NetconfClient(clientId, netconfAddress, NETCONF_CLIENT_DISPATCHER);
290                 long sessionId = netconfClient.getSessionId();
291                 logger.info("Client with sessionid {} hello exchanged", sessionId);
292
293                 final NetconfMessage getMessage = XmlFileLoader
294                         .xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
295                 NetconfMessage result = netconfClient.sendMessage(getMessage);
296                 logger.info("Client with sessionid {} got result {}", sessionId, result);
297                 netconfClient.close();
298                 logger.info("Client with session id {} ended", sessionId);
299                 thrownException = Optional.absent();
300             } catch (final Exception e) {
301                 thrownException = Optional.of(e);
302             }
303         }
304     }
305 }