Bug 624 - Separate netty and exi specific classes from netconf-util.
[controller.git] / opendaylight / netconf / netconf-impl / src / test / java / org / opendaylight / controller / netconf / impl / ConcurrentClientsTest.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.impl;
10
11 import static com.google.common.base.Preconditions.checkNotNull;
12 import static org.junit.Assert.assertEquals;
13 import static org.junit.Assert.fail;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Mockito.doNothing;
16 import static org.mockito.Mockito.mock;
17
18 import com.google.common.base.Preconditions;
19 import com.google.common.collect.Lists;
20 import com.google.common.collect.Sets;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.EventLoopGroup;
23 import io.netty.channel.nio.NioEventLoopGroup;
24 import io.netty.util.HashedWheelTimer;
25 import io.netty.util.concurrent.GlobalEventExecutor;
26 import java.io.DataOutputStream;
27 import java.io.InputStream;
28 import java.io.InputStreamReader;
29 import java.lang.management.ManagementFactory;
30 import java.net.InetSocketAddress;
31 import java.net.Socket;
32 import java.util.Arrays;
33 import java.util.Collection;
34 import java.util.Collections;
35 import java.util.List;
36 import java.util.Set;
37 import java.util.concurrent.ExecutionException;
38 import java.util.concurrent.ExecutorService;
39 import java.util.concurrent.Executors;
40 import java.util.concurrent.Future;
41 import java.util.concurrent.ThreadFactory;
42 import java.util.concurrent.atomic.AtomicLong;
43 import org.apache.commons.io.IOUtils;
44 import org.junit.After;
45 import org.junit.AfterClass;
46 import org.junit.Before;
47 import org.junit.BeforeClass;
48 import org.junit.Test;
49 import org.junit.runner.RunWith;
50 import org.junit.runners.Parameterized;
51 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
52 import org.opendaylight.controller.netconf.api.NetconfMessage;
53 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
54 import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl;
55 import org.opendaylight.controller.netconf.client.SimpleNetconfClientSessionListener;
56 import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
57 import org.opendaylight.controller.netconf.client.conf.NetconfClientConfigurationBuilder;
58 import org.opendaylight.controller.netconf.client.test.TestingNetconfClient;
59 import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
60 import org.opendaylight.controller.netconf.impl.osgi.SessionMonitoringService;
61 import org.opendaylight.controller.netconf.mapping.api.Capability;
62 import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
63 import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
64 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
65 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
66 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
67 import org.opendaylight.controller.netconf.nettyutil.handler.exi.NetconfStartExiMessage;
68 import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
69 import org.opendaylight.controller.netconf.util.messages.NetconfMessageUtil;
70 import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
71 import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
72 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
73 import org.opendaylight.protocol.framework.NeverReconnectStrategy;
74 import org.slf4j.Logger;
75 import org.slf4j.LoggerFactory;
76 import org.w3c.dom.Document;
77
78 @RunWith(Parameterized.class)
79 public class ConcurrentClientsTest {
80     private static final Logger logger = LoggerFactory.getLogger(ConcurrentClientsTest.class);
81
82     private static ExecutorService clientExecutor;
83
84     private static final int CONCURRENCY = 32;
85     private static final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303);
86
87     private int nettyThreads;
88     private Class<? extends Runnable> clientRunnable;
89     private Set<String> serverCaps;
90
91     public ConcurrentClientsTest(int nettyThreads, Class<? extends Runnable> clientRunnable, Set<String> serverCaps) {
92         this.nettyThreads = nettyThreads;
93         this.clientRunnable = clientRunnable;
94         this.serverCaps = serverCaps;
95     }
96
97     @Parameterized.Parameters()
98     public static Collection<Object[]> data() {
99         return Arrays.asList(new Object[][]{
100                 {4, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES},
101                 {1, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES},
102                 // empty set of capabilities = only base 1.0 netconf capability
103                 {4, TestingNetconfClientRunnable.class, Collections.emptySet()},
104                 {4, TestingNetconfClientRunnable.class, getOnlyExiServerCaps()},
105                 {4, TestingNetconfClientRunnable.class, getOnlyChunkServerCaps()},
106
107                 {4, BlockingClientRunnable.class, getOnlyExiServerCaps()},
108                 {1, BlockingClientRunnable.class, getOnlyExiServerCaps()},
109         });
110     }
111
112     private EventLoopGroup nettyGroup;
113     private NetconfClientDispatcher netconfClientDispatcher;
114
115     private DefaultCommitNotificationProducer commitNot;
116
117     HashedWheelTimer hashedWheelTimer;
118     private TestingNetconfOperation testingNetconfOperation;
119
120     public static SessionMonitoringService createMockedMonitoringService() {
121         SessionMonitoringService monitoring = mock(SessionMonitoringService.class);
122         doNothing().when(monitoring).onSessionUp(any(NetconfServerSession.class));
123         doNothing().when(monitoring).onSessionDown(any(NetconfServerSession.class));
124         return monitoring;
125     }
126
127     @BeforeClass
128     public static void setUpClientExecutor() {
129         clientExecutor = Executors.newFixedThreadPool(CONCURRENCY, new ThreadFactory() {
130             int i = 1;
131
132             @Override
133             public Thread newThread(final Runnable r) {
134                 Thread thread = new Thread(r);
135                 thread.setName("client-" + i++);
136                 thread.setDaemon(true);
137                 return thread;
138             }
139         });
140     }
141
142     @Before
143     public void setUp() throws Exception {
144         hashedWheelTimer = new HashedWheelTimer();
145         nettyGroup = new NioEventLoopGroup(nettyThreads);
146         netconfClientDispatcher = new NetconfClientDispatcherImpl(nettyGroup, nettyGroup, hashedWheelTimer);
147
148         NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
149
150         testingNetconfOperation = new TestingNetconfOperation();
151         factoriesListener.onAddNetconfOperationServiceFactory(new TestingOperationServiceFactory(testingNetconfOperation));
152
153         SessionIdProvider idProvider = new SessionIdProvider();
154
155         NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory(
156                 hashedWheelTimer, factoriesListener, idProvider, 5000, commitNot, createMockedMonitoringService(), serverCaps);
157
158         commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
159
160         NetconfServerDispatcher.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcher.ServerChannelInitializer(serverNegotiatorFactory);
161         final NetconfServerDispatcher dispatch = new NetconfServerDispatcher(serverChannelInitializer, nettyGroup, nettyGroup);
162
163         ChannelFuture s = dispatch.createServer(netconfAddress);
164         s.await();
165     }
166
167     @After
168     public void tearDown(){
169         commitNot.close();
170         hashedWheelTimer.stop();
171         try {
172             nettyGroup.shutdownGracefully().get();
173         } catch (InterruptedException | ExecutionException e) {
174             logger.warn("Ignoring exception while cleaning up after test", e);
175         }
176     }
177
178     @AfterClass
179     public static void tearDownClientExecutor() {
180         clientExecutor.shutdownNow();
181     }
182
183     @Test(timeout = CONCURRENCY * 1000)
184     public void testConcurrentClients() throws Exception {
185
186         List<Future<?>> futures = Lists.newArrayListWithCapacity(CONCURRENCY);
187
188         for (int i = 0; i < CONCURRENCY; i++) {
189             futures.add(clientExecutor.submit(getInstanceOfClientRunnable()));
190         }
191
192         for (Future<?> future : futures) {
193             try {
194                 future.get();
195             } catch (InterruptedException e) {
196                 throw new IllegalStateException(e);
197             } catch (ExecutionException e) {
198                 logger.error("Thread for testing client failed", e);
199                 fail("Client failed: " + e.getMessage());
200             }
201         }
202
203         assertEquals(CONCURRENCY, testingNetconfOperation.getMessageCount());
204     }
205
206     public static Set<String> getOnlyExiServerCaps() {
207         return Sets.newHashSet(
208                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
209                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0
210         );
211     }
212
213     public static Set<String> getOnlyChunkServerCaps() {
214         return Sets.newHashSet(
215                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
216                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1
217         );
218     }
219
220     public Runnable getInstanceOfClientRunnable() throws Exception {
221         return clientRunnable.getConstructor(ConcurrentClientsTest.class).newInstance(this);
222     }
223
224     /**
225      * Responds to all operations except start-exi and counts all requests
226      */
227     private static class TestingNetconfOperation implements NetconfOperation {
228
229         private final AtomicLong counter = new AtomicLong();
230
231         @Override
232         public HandlingPriority canHandle(Document message) {
233             return XmlUtil.toString(message).contains(NetconfStartExiMessage.START_EXI) ?
234                     HandlingPriority.CANNOT_HANDLE :
235                     HandlingPriority.HANDLE_WITH_MAX_PRIORITY;
236         }
237
238         @Override
239         public Document handle(Document requestMessage, NetconfOperationChainedExecution subsequentOperation) throws NetconfDocumentedException {
240             try {
241                 logger.info("Handling netconf message from test {}", XmlUtil.toString(requestMessage));
242                 counter.getAndIncrement();
243                 return XmlUtil.readXmlToDocument("<test/>");
244             } catch (Exception e) {
245                 throw new RuntimeException(e);
246             }
247         }
248
249         public long getMessageCount() {
250             return counter.get();
251         }
252     }
253
254     /**
255      * Hardcoded operation service factory
256      */
257     private static class TestingOperationServiceFactory implements NetconfOperationServiceFactory {
258         private final NetconfOperation[] operations;
259
260         public TestingOperationServiceFactory(final NetconfOperation... operations) {
261             this.operations = operations;
262         }
263
264         @Override
265         public NetconfOperationService createService(String netconfSessionIdForReporting) {
266             return new NetconfOperationService() {
267                 @Override
268                 public Set<Capability> getCapabilities() {
269                     return Collections.emptySet();
270                 }
271
272                 @Override
273                 public Set<NetconfOperation> getNetconfOperations() {
274                     return Sets.newHashSet(operations);
275                 }
276
277                 @Override
278                 public void close() {}
279             };
280         }
281     }
282
283     /**
284      * Pure socket based blocking client
285      */
286     public final class BlockingClientRunnable implements Runnable {
287
288         @Override
289         public void run() {
290             try {
291                 run2();
292             } catch (Exception e) {
293                 throw new IllegalStateException(Thread.currentThread().getName(), e);
294             }
295         }
296
297         private void run2() throws Exception {
298             InputStream clientHello = checkNotNull(XmlFileLoader
299                     .getResourceAsStream("netconfMessages/client_hello.xml"));
300             InputStream getConfig = checkNotNull(XmlFileLoader.getResourceAsStream("netconfMessages/getConfig.xml"));
301
302             Socket clientSocket = new Socket(netconfAddress.getHostString(), netconfAddress.getPort());
303             DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
304             InputStreamReader inFromServer = new InputStreamReader(clientSocket.getInputStream());
305
306             StringBuffer sb = new StringBuffer();
307             while (sb.toString().endsWith("]]>]]>") == false) {
308                 sb.append((char) inFromServer.read());
309             }
310             logger.info(sb.toString());
311
312             outToServer.write(IOUtils.toByteArray(clientHello));
313             outToServer.write("]]>]]>".getBytes());
314             outToServer.flush();
315             // Thread.sleep(100);
316             outToServer.write(IOUtils.toByteArray(getConfig));
317             outToServer.write("]]>]]>".getBytes());
318             outToServer.flush();
319             Thread.sleep(100);
320             sb = new StringBuffer();
321             while (sb.toString().endsWith("]]>]]>") == false) {
322                 sb.append((char) inFromServer.read());
323             }
324             logger.info(sb.toString());
325             clientSocket.close();
326         }
327     }
328
329     /**
330      * TestingNetconfClient based runnable
331      */
332     public final class TestingNetconfClientRunnable implements Runnable {
333
334         @Override
335         public void run() {
336             try {
337                 final TestingNetconfClient netconfClient =
338                         new TestingNetconfClient(Thread.currentThread().getName(), netconfClientDispatcher, getClientConfig());
339                 long sessionId = netconfClient.getSessionId();
340                 logger.info("Client with session id {}: hello exchanged", sessionId);
341
342                 final NetconfMessage getMessage = XmlFileLoader
343                         .xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
344                 NetconfMessage result = netconfClient.sendRequest(getMessage).get();
345                 logger.info("Client with session id {}: got result {}", sessionId, result);
346
347                 Preconditions.checkState(NetconfMessageUtil.isErrorMessage(result) == false,
348                         "Received error response: " + XmlUtil.toString(result.getDocument()) + " to request: "
349                                 + XmlUtil.toString(getMessage.getDocument()));
350
351                 netconfClient.close();
352                 logger.info("Client with session id {}: ended", sessionId);
353             } catch (final Exception e) {
354                 throw new IllegalStateException(Thread.currentThread().getName(), e);
355             }
356         }
357
358         private NetconfClientConfiguration getClientConfig() {
359             final NetconfClientConfigurationBuilder b = NetconfClientConfigurationBuilder.create();
360             b.withAddress(netconfAddress);
361             b.withAdditionalHeader(new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp",
362                     "client"));
363             b.withSessionListener(new SimpleNetconfClientSessionListener());
364             b.withReconnectStrategy(new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE,
365                     NetconfClientConfigurationBuilder.DEFAULT_CONNECTION_TIMEOUT_MILLIS));
366             return b.build();
367         }
368     }
369 }