b363976aaed6b6afd8f3aa4c9844ffb5357a661c
[controller.git] / opendaylight / netconf / netconf-impl / src / test / java / org / opendaylight / controller / netconf / impl / ConcurrentClientsTest.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.impl;
10
11 import com.google.common.base.Optional;
12 import com.google.common.collect.Sets;
13 import io.netty.channel.ChannelFuture;
14 import io.netty.channel.EventLoopGroup;
15 import io.netty.channel.nio.NioEventLoopGroup;
16 import io.netty.util.HashedWheelTimer;
17 import org.apache.commons.io.IOUtils;
18 import org.junit.After;
19 import org.junit.AfterClass;
20 import org.junit.Before;
21 import org.junit.Test;
22 import org.mockito.Mock;
23 import org.mockito.MockitoAnnotations;
24 import org.opendaylight.controller.config.util.ConfigRegistryJMXClient;
25 import org.opendaylight.controller.config.util.ConfigTransactionJMXClient;
26 import org.opendaylight.controller.config.yang.store.api.YangStoreService;
27 import org.opendaylight.controller.config.yang.store.api.YangStoreSnapshot;
28 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
29 import org.opendaylight.controller.netconf.api.NetconfMessage;
30 import org.opendaylight.controller.netconf.api.NetconfOperationRouter;
31 import org.opendaylight.controller.netconf.client.NetconfClient;
32 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
33 import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
34 import org.opendaylight.controller.netconf.mapping.api.Capability;
35 import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
36 import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
37 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationFilter;
38 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
39 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
40 import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
41 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44 import org.w3c.dom.Document;
45
46 import javax.management.ObjectName;
47 import javax.net.ssl.SSLContext;
48 import java.io.DataOutputStream;
49 import java.io.InputStream;
50 import java.io.InputStreamReader;
51 import java.lang.management.ManagementFactory;
52 import java.net.InetSocketAddress;
53 import java.net.Socket;
54 import java.util.ArrayList;
55 import java.util.Collections;
56 import java.util.List;
57 import java.util.Set;
58 import java.util.concurrent.TimeUnit;
59
60 import static com.google.common.base.Preconditions.checkNotNull;
61 import static org.junit.Assert.fail;
62 import static org.mockito.Matchers.any;
63 import static org.mockito.Mockito.doReturn;
64 import static org.mockito.Mockito.mock;
65
66 public class ConcurrentClientsTest {
67
68     private static final int CONCURRENCY = 16;
69     private static EventLoopGroup nettyGroup = new NioEventLoopGroup();
70     public static final NetconfClientDispatcher NETCONF_CLIENT_DISPATCHER = new NetconfClientDispatcher(
71             Optional.<SSLContext> absent(), nettyGroup, nettyGroup);
72
73     @Mock
74     private YangStoreService yangStoreService;
75     @Mock
76     private ConfigRegistryJMXClient jmxClient;
77
78     private final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303);
79
80     static final Logger logger = LoggerFactory.getLogger(ConcurrentClientsTest.class);
81
82     private DefaultCommitNotificationProducer commitNot;
83     private NetconfServerDispatcher dispatch;
84
85
86     @Before
87     public void setUp() throws Exception {
88         { // init mocks
89             MockitoAnnotations.initMocks(this);
90             final YangStoreSnapshot yStore = mock(YangStoreSnapshot.class);
91             doReturn(yStore).when(this.yangStoreService).getYangStoreSnapshot();
92             doReturn(Collections.emptyMap()).when(yStore).getModuleMXBeanEntryMap();
93             doReturn(Collections.emptyMap()).when(yStore).getModuleMap();
94
95             final ConfigTransactionJMXClient mockedTCl = mock(ConfigTransactionJMXClient.class);
96             doReturn(mockedTCl).when(this.jmxClient).getConfigTransactionClient(any(ObjectName.class));
97
98             doReturn(Collections.emptySet()).when(jmxClient).lookupConfigBeans();
99         }
100
101         NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
102         factoriesListener.onAddNetconfOperationServiceFactory(mockOpF());
103
104         SessionIdProvider idProvider = new SessionIdProvider();
105         NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory(
106                 new HashedWheelTimer(5000, TimeUnit.MILLISECONDS), factoriesListener, idProvider);
107
108         commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
109
110         NetconfServerSessionListenerFactory listenerFactory = new NetconfServerSessionListenerFactory(
111                 factoriesListener, commitNot, idProvider);
112         NetconfServerDispatcher.ServerSslChannelInitializer serverChannelInitializer = new NetconfServerDispatcher.ServerSslChannelInitializer(
113                 Optional.<SSLContext> absent(), serverNegotiatorFactory, listenerFactory);
114         dispatch = new NetconfServerDispatcher(serverChannelInitializer, nettyGroup, nettyGroup);
115
116         ChannelFuture s = dispatch.createServer(netconfAddress);
117         s.await();
118     }
119
120     @AfterClass
121     public static void tearDownStatic() {
122         nettyGroup.shutdownGracefully();
123     }
124
125     private NetconfOperationServiceFactory mockOpF() {
126         return new NetconfOperationServiceFactory() {
127             @Override
128             public NetconfOperationService createService(long netconfSessionId, String netconfSessionIdForReporting) {
129                 return new NetconfOperationService() {
130                     @Override
131                     public Set<Capability> getCapabilities() {
132                         return Collections.emptySet();
133                     }
134
135                     @Override
136                     public Set<NetconfOperation> getNetconfOperations() {
137                         return Sets.<NetconfOperation> newHashSet(new NetconfOperation() {
138                             @Override
139                             public HandlingPriority canHandle(Document message) {
140                                 return HandlingPriority.getHandlingPriority(Integer.MAX_VALUE);
141                             }
142
143                             @Override
144                             public Document handle(Document message, NetconfOperationRouter operationRouter)
145                                     throws NetconfDocumentedException {
146                                 try {
147                                     return XmlUtil.readXmlToDocument("<test/>");
148                                 } catch (Exception e) {
149                                     throw new RuntimeException(e);
150                                 }
151                             }
152                         });
153                     }
154
155                     @Override
156                     public Set<NetconfOperationFilter> getFilters() {
157                         return Collections.emptySet();
158                     }
159
160                     @Override
161                     public void close() {
162                     }
163                 };
164             }
165         };
166     }
167
168     @After
169     public void cleanUp() throws Exception {
170         commitNot.close();
171     }
172
173     @Test
174     public void multipleClients() throws Exception {
175         List<TestingThread> threads = new ArrayList<>();
176
177         final int attempts = 5;
178         for (int i = 0; i < CONCURRENCY; i++) {
179             TestingThread thread = new TestingThread(String.valueOf(i), attempts);
180             threads.add(thread);
181             thread.start();
182         }
183
184         for (TestingThread thread : threads) {
185             thread.join();
186             if(thread.thrownException.isPresent()) {
187                 Exception exception = thread.thrownException.get();
188                 logger.error("Thread for testing client failed", exception);
189                 fail("Client thread " + thread + " failed: " + exception.getMessage());
190             }
191         }
192     }
193
194     @Test
195     public void synchronizationTest() throws Exception {
196         new BlockingThread("foo").run2();
197     }
198
199     @Test
200     public void multipleBlockingClients() throws Exception {
201         List<BlockingThread> threads = new ArrayList<>();
202         for (int i = 0; i < CONCURRENCY; i++) {
203             BlockingThread thread = new BlockingThread(String.valueOf(i));
204             threads.add(thread);
205             thread.start();
206         }
207
208         for (BlockingThread thread : threads) {
209             thread.join();
210             if(thread.thrownException.isPresent()) {
211                 Exception exception = thread.thrownException.get();
212                 logger.error("Thread for testing client failed", exception);
213                 fail("Client thread " + thread + " failed: " + exception.getMessage());
214             }
215         }
216     }
217
218     class BlockingThread extends Thread {
219         private Optional<Exception> thrownException;
220
221         public BlockingThread(String name) {
222             super("client-" + name);
223         }
224
225         @Override
226         public void run() {
227             try {
228                 run2();
229                 thrownException = Optional.absent();
230             } catch (Exception e) {
231                 thrownException = Optional.of(e);
232             }
233         }
234
235         private void run2() throws Exception {
236             InputStream clientHello = checkNotNull(XmlFileLoader
237                     .getResourceAsStream("netconfMessages/client_hello.xml"));
238             InputStream getConfig = checkNotNull(XmlFileLoader.getResourceAsStream("netconfMessages/getConfig.xml"));
239
240             Socket clientSocket = new Socket(netconfAddress.getHostString(), netconfAddress.getPort());
241             DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
242             InputStreamReader inFromServer = new InputStreamReader(clientSocket.getInputStream());
243
244             StringBuffer sb = new StringBuffer();
245             while (sb.toString().endsWith("]]>]]>") == false) {
246                 sb.append((char) inFromServer.read());
247             }
248             logger.info(sb.toString());
249
250             outToServer.write(IOUtils.toByteArray(clientHello));
251             outToServer.write("]]>]]>".getBytes());
252             outToServer.flush();
253             // Thread.sleep(100);
254             outToServer.write(IOUtils.toByteArray(getConfig));
255             outToServer.write("]]>]]>".getBytes());
256             outToServer.flush();
257             Thread.sleep(100);
258             sb = new StringBuffer();
259             while (sb.toString().endsWith("]]>]]>") == false) {
260                 sb.append((char) inFromServer.read());
261             }
262             logger.info(sb.toString());
263             clientSocket.close();
264         }
265     }
266
267     class TestingThread extends Thread {
268
269         private final String clientId;
270         private final int attempts;
271         private Optional<Exception> thrownException;
272
273         TestingThread(String clientId, int attempts) {
274             this.clientId = clientId;
275             this.attempts = attempts;
276             setName("client-" + clientId);
277         }
278
279         @Override
280         public void run() {
281             try {
282                 final NetconfClient netconfClient = new NetconfClient(clientId, netconfAddress, NETCONF_CLIENT_DISPATCHER);
283                 long sessionId = netconfClient.getSessionId();
284                 logger.info("Client with sessionid {} hello exchanged", sessionId);
285
286                 final NetconfMessage getMessage = XmlFileLoader
287                         .xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
288                 NetconfMessage result = netconfClient.sendMessage(getMessage);
289                 logger.info("Client with sessionid {} got result {}", sessionId, result);
290                 netconfClient.close();
291                 logger.info("Client with session id {} ended", sessionId);
292                 thrownException = Optional.absent();
293             } catch (final Exception e) {
294                 thrownException = Optional.of(e);
295             }
296         }
297     }
298 }