35abee564046d5249f47cc3efd18090bc39d963b
[netconf.git] / netconf / netconf-impl / src / test / java / org / opendaylight / netconf / impl / ConcurrentClientsTest.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.impl;
10
11 import static com.google.common.base.Preconditions.checkNotNull;
12 import static org.junit.Assert.assertEquals;
13 import static org.junit.Assert.fail;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Matchers.anySetOf;
16 import static org.mockito.Mockito.doNothing;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.mock;
19
20 import com.google.common.base.Preconditions;
21 import com.google.common.collect.Lists;
22 import com.google.common.collect.Sets;
23 import com.google.common.io.ByteStreams;
24 import io.netty.channel.ChannelFuture;
25 import io.netty.channel.EventLoopGroup;
26 import io.netty.channel.nio.NioEventLoopGroup;
27 import io.netty.util.HashedWheelTimer;
28 import io.netty.util.concurrent.GlobalEventExecutor;
29 import java.io.DataOutputStream;
30 import java.io.InputStream;
31 import java.io.InputStreamReader;
32 import java.net.InetSocketAddress;
33 import java.net.Socket;
34 import java.util.Arrays;
35 import java.util.Collection;
36 import java.util.Collections;
37 import java.util.List;
38 import java.util.Set;
39 import java.util.concurrent.ExecutionException;
40 import java.util.concurrent.ExecutorService;
41 import java.util.concurrent.Executors;
42 import java.util.concurrent.Future;
43 import java.util.concurrent.ThreadFactory;
44 import java.util.concurrent.atomic.AtomicLong;
45 import org.junit.After;
46 import org.junit.AfterClass;
47 import org.junit.Before;
48 import org.junit.BeforeClass;
49 import org.junit.Test;
50 import org.junit.runner.RunWith;
51 import org.junit.runners.Parameterized;
52 import org.opendaylight.controller.config.util.capability.Capability;
53 import org.opendaylight.controller.config.util.xml.DocumentedException;
54 import org.opendaylight.controller.config.util.xml.XmlUtil;
55 import org.opendaylight.netconf.api.NetconfMessage;
56 import org.opendaylight.netconf.api.messages.NetconfHelloMessageAdditionalHeader;
57 import org.opendaylight.netconf.api.monitoring.CapabilityListener;
58 import org.opendaylight.netconf.api.monitoring.NetconfMonitoringService;
59 import org.opendaylight.netconf.api.xml.XmlNetconfConstants;
60 import org.opendaylight.netconf.client.NetconfClientDispatcher;
61 import org.opendaylight.netconf.client.NetconfClientDispatcherImpl;
62 import org.opendaylight.netconf.client.SimpleNetconfClientSessionListener;
63 import org.opendaylight.netconf.client.TestingNetconfClient;
64 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
65 import org.opendaylight.netconf.client.conf.NetconfClientConfigurationBuilder;
66 import org.opendaylight.netconf.impl.osgi.AggregatedNetconfOperationServiceFactory;
67 import org.opendaylight.netconf.mapping.api.HandlingPriority;
68 import org.opendaylight.netconf.mapping.api.NetconfOperation;
69 import org.opendaylight.netconf.mapping.api.NetconfOperationChainedExecution;
70 import org.opendaylight.netconf.mapping.api.NetconfOperationService;
71 import org.opendaylight.netconf.mapping.api.NetconfOperationServiceFactory;
72 import org.opendaylight.netconf.nettyutil.handler.exi.NetconfStartExiMessage;
73 import org.opendaylight.netconf.util.messages.NetconfMessageUtil;
74 import org.opendaylight.netconf.util.test.XmlFileLoader;
75 import org.opendaylight.protocol.framework.NeverReconnectStrategy;
76 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Uri;
77 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.CapabilitiesBuilder;
78 import org.slf4j.Logger;
79 import org.slf4j.LoggerFactory;
80 import org.w3c.dom.Document;
81
82 @RunWith(Parameterized.class)
83 public class ConcurrentClientsTest {
84     private static final Logger LOG = LoggerFactory.getLogger(ConcurrentClientsTest.class);
85
86     private static ExecutorService clientExecutor;
87
88     private static final int CONCURRENCY = 32;
89     private static final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303);
90
91     private int nettyThreads;
92     private Class<? extends Runnable> clientRunnable;
93     private Set<String> serverCaps;
94
95     public ConcurrentClientsTest(int nettyThreads, Class<? extends Runnable> clientRunnable, Set<String> serverCaps) {
96         this.nettyThreads = nettyThreads;
97         this.clientRunnable = clientRunnable;
98         this.serverCaps = serverCaps;
99     }
100
101     @Parameterized.Parameters()
102     public static Collection<Object[]> data() {
103         return Arrays.asList(new Object[][]{{4, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES},
104                                             {1, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES},
105                                             // empty set of capabilities = only base 1.0 netconf capability
106                                             {4, TestingNetconfClientRunnable.class, Collections.emptySet()},
107                                             {4, TestingNetconfClientRunnable.class, getOnlyExiServerCaps()},
108                                             {4, TestingNetconfClientRunnable.class, getOnlyChunkServerCaps()},
109                                             {4, BlockingClientRunnable.class, getOnlyExiServerCaps()},
110                                             {1, BlockingClientRunnable.class, getOnlyExiServerCaps()},
111         });
112     }
113
114     private EventLoopGroup nettyGroup;
115     private NetconfClientDispatcher netconfClientDispatcher;
116
117     HashedWheelTimer hashedWheelTimer;
118     private TestingNetconfOperation testingNetconfOperation;
119
120     public static NetconfMonitoringService createMockedMonitoringService() {
121         NetconfMonitoringService monitoring = mock(NetconfMonitoringService.class);
122         doNothing().when(monitoring).onSessionUp(any(NetconfServerSession.class));
123         doNothing().when(monitoring).onSessionDown(any(NetconfServerSession.class));
124         doReturn(new AutoCloseable() {
125             @Override
126             public void close() throws Exception {
127
128             }
129         }).when(monitoring).registerListener(any(NetconfMonitoringService.MonitoringListener.class));
130         doNothing().when(monitoring).onCapabilitiesChanged(anySetOf(Capability.class), anySetOf(Capability.class));
131         doReturn(new CapabilitiesBuilder().setCapability(Collections.<Uri>emptyList()).build()).when(monitoring).getCapabilities();
132         return monitoring;
133     }
134
135     @BeforeClass
136     public static void setUpClientExecutor() {
137         clientExecutor = Executors.newFixedThreadPool(CONCURRENCY, new ThreadFactory() {
138             int i = 1;
139
140             @Override
141             public Thread newThread(final Runnable r) {
142                 Thread thread = new Thread(r);
143                 thread.setName("client-" + i++);
144                 thread.setDaemon(true);
145                 return thread;
146             }
147         });
148     }
149
150     @Before
151     public void setUp() throws Exception {
152         hashedWheelTimer = new HashedWheelTimer();
153         nettyGroup = new NioEventLoopGroup(nettyThreads);
154         netconfClientDispatcher = new NetconfClientDispatcherImpl(nettyGroup, nettyGroup, hashedWheelTimer);
155
156         AggregatedNetconfOperationServiceFactory factoriesListener = new AggregatedNetconfOperationServiceFactory();
157
158         testingNetconfOperation = new TestingNetconfOperation();
159         factoriesListener.onAddNetconfOperationServiceFactory(new TestingOperationServiceFactory(testingNetconfOperation));
160
161         SessionIdProvider idProvider = new SessionIdProvider();
162
163         NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactoryBuilder()
164                 .setTimer(hashedWheelTimer)
165                 .setAggregatedOpService(factoriesListener)
166                 .setIdProvider(idProvider)
167                 .setConnectionTimeoutMillis(5000)
168                 .setMonitoringService(createMockedMonitoringService())
169                 .setBaseCapabilities(serverCaps)
170                 .build();
171
172         NetconfServerDispatcherImpl.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcherImpl.ServerChannelInitializer(serverNegotiatorFactory);
173         final NetconfServerDispatcherImpl dispatch = new NetconfServerDispatcherImpl(serverChannelInitializer, nettyGroup, nettyGroup);
174
175         ChannelFuture s = dispatch.createServer(netconfAddress);
176         s.await();
177     }
178
179     @After
180     public void tearDown(){
181         hashedWheelTimer.stop();
182         try {
183             nettyGroup.shutdownGracefully().get();
184         } catch (InterruptedException | ExecutionException e) {
185             LOG.warn("Ignoring exception while cleaning up after test", e);
186         }
187     }
188
189     @AfterClass
190     public static void tearDownClientExecutor() {
191         clientExecutor.shutdownNow();
192     }
193
194     @Test(timeout = CONCURRENCY * 1000)
195     public void testConcurrentClients() throws Exception {
196
197         List<Future<?>> futures = Lists.newArrayListWithCapacity(CONCURRENCY);
198
199         for (int i = 0; i < CONCURRENCY; i++) {
200             futures.add(clientExecutor.submit(getInstanceOfClientRunnable()));
201         }
202
203         for (Future<?> future : futures) {
204             try {
205                 future.get();
206             } catch (InterruptedException e) {
207                 throw new IllegalStateException(e);
208             } catch (ExecutionException e) {
209                 LOG.error("Thread for testing client failed", e);
210                 fail("Client failed: " + e.getMessage());
211             }
212         }
213
214         assertEquals(CONCURRENCY, testingNetconfOperation.getMessageCount());
215     }
216
217     public static Set<String> getOnlyExiServerCaps() {
218         return Sets.newHashSet(
219                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
220                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0
221         );
222     }
223
224     public static Set<String> getOnlyChunkServerCaps() {
225         return Sets.newHashSet(
226                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
227                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1
228         );
229     }
230
231     public Runnable getInstanceOfClientRunnable() throws Exception {
232         return clientRunnable.getConstructor(ConcurrentClientsTest.class).newInstance(this);
233     }
234
235     /**
236      * Responds to all operations except start-exi and counts all requests
237      */
238     private static class TestingNetconfOperation implements NetconfOperation {
239
240         private final AtomicLong counter = new AtomicLong();
241
242         @Override
243         public HandlingPriority canHandle(Document message) {
244             return XmlUtil.toString(message).contains(NetconfStartExiMessage.START_EXI) ?
245                     HandlingPriority.CANNOT_HANDLE :
246                     HandlingPriority.HANDLE_WITH_MAX_PRIORITY;
247         }
248
249         @Override
250         public Document handle(Document requestMessage, NetconfOperationChainedExecution subsequentOperation) throws DocumentedException {
251             try {
252                 LOG.info("Handling netconf message from test {}", XmlUtil.toString(requestMessage));
253                 counter.getAndIncrement();
254                 return XmlUtil.readXmlToDocument("<test/>");
255             } catch (Exception e) {
256                 throw new RuntimeException(e);
257             }
258         }
259
260         public long getMessageCount() {
261             return counter.get();
262         }
263     }
264
265     /**
266      * Hardcoded operation service factory
267      */
268     private static class TestingOperationServiceFactory implements NetconfOperationServiceFactory {
269         private final NetconfOperation[] operations;
270
271         public TestingOperationServiceFactory(final NetconfOperation... operations) {
272             this.operations = operations;
273         }
274
275         @Override
276         public Set<Capability> getCapabilities() {
277             return Collections.emptySet();
278         }
279
280         @Override
281         public AutoCloseable registerCapabilityListener(final CapabilityListener listener) {
282             return new AutoCloseable(){
283                 @Override
284                 public void close() throws Exception {}
285             };
286         }
287
288         @Override
289         public NetconfOperationService createService(String netconfSessionIdForReporting) {
290             return new NetconfOperationService() {
291
292                 @Override
293                 public Set<NetconfOperation> getNetconfOperations() {
294                     return Sets.newHashSet(operations);
295                 }
296
297                 @Override
298                 public void close() {}
299             };
300         }
301     }
302
303     /**
304      * Pure socket based blocking client
305      */
306     public final class BlockingClientRunnable implements Runnable {
307
308         @Override
309         public void run() {
310             try {
311                 run2();
312             } catch (Exception e) {
313                 throw new IllegalStateException(Thread.currentThread().getName(), e);
314             }
315         }
316
317         private void run2() throws Exception {
318             InputStream clientHello = checkNotNull(XmlFileLoader
319                     .getResourceAsStream("netconfMessages/client_hello.xml"));
320             InputStream getConfig = checkNotNull(XmlFileLoader.getResourceAsStream("netconfMessages/getConfig.xml"));
321
322             Socket clientSocket = new Socket(netconfAddress.getHostString(), netconfAddress.getPort());
323             DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
324             InputStreamReader inFromServer = new InputStreamReader(clientSocket.getInputStream());
325
326             StringBuffer sb = new StringBuffer();
327             while (sb.toString().endsWith("]]>]]>") == false) {
328                 sb.append((char) inFromServer.read());
329             }
330             LOG.info(sb.toString());
331
332             outToServer.write(ByteStreams.toByteArray(clientHello));
333             outToServer.write("]]>]]>".getBytes());
334             outToServer.flush();
335             // Thread.sleep(100);
336             outToServer.write(ByteStreams.toByteArray(getConfig));
337             outToServer.write("]]>]]>".getBytes());
338             outToServer.flush();
339             Thread.sleep(100);
340             sb = new StringBuffer();
341             while (sb.toString().endsWith("]]>]]>") == false) {
342                 sb.append((char) inFromServer.read());
343             }
344             LOG.info(sb.toString());
345             clientSocket.close();
346         }
347     }
348
349     /**
350      * TestingNetconfClient based runnable
351      */
352     public final class TestingNetconfClientRunnable implements Runnable {
353
354         @Override
355         public void run() {
356             try {
357                 final TestingNetconfClient netconfClient =
358                         new TestingNetconfClient(Thread.currentThread().getName(), netconfClientDispatcher, getClientConfig());
359                 long sessionId = netconfClient.getSessionId();
360                 LOG.info("Client with session id {}: hello exchanged", sessionId);
361
362                 final NetconfMessage getMessage = XmlFileLoader
363                         .xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
364                 NetconfMessage result = netconfClient.sendRequest(getMessage).get();
365                 LOG.info("Client with session id {}: got result {}", sessionId, result);
366
367                 Preconditions.checkState(NetconfMessageUtil.isErrorMessage(result) == false,
368                         "Received error response: " + XmlUtil.toString(result.getDocument()) + " to request: "
369                                 + XmlUtil.toString(getMessage.getDocument()));
370
371                 netconfClient.close();
372                 LOG.info("Client with session id {}: ended", sessionId);
373             } catch (final Exception e) {
374                 throw new IllegalStateException(Thread.currentThread().getName(), e);
375             }
376         }
377
378         private NetconfClientConfiguration getClientConfig() {
379             final NetconfClientConfigurationBuilder b = NetconfClientConfigurationBuilder.create();
380             b.withAddress(netconfAddress);
381             b.withAdditionalHeader(new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp",
382                     "client"));
383             b.withSessionListener(new SimpleNetconfClientSessionListener());
384             b.withReconnectStrategy(new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE,
385                     NetconfClientConfigurationBuilder.DEFAULT_CONNECTION_TIMEOUT_MILLIS));
386             return b.build();
387         }
388     }
389 }