Merge "Bug 8153: Enforce check-style rules for netconf - netconf-notification-api"
[netconf.git] / netconf / netconf-impl / src / test / java / org / opendaylight / netconf / impl / ConcurrentClientsTest.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.impl;
10
11 import static com.google.common.base.Preconditions.checkNotNull;
12 import static org.junit.Assert.assertEquals;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Mockito.doNothing;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.mock;
17
18 import com.google.common.base.Preconditions;
19 import com.google.common.collect.Lists;
20 import com.google.common.collect.Sets;
21 import com.google.common.io.ByteStreams;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.EventLoopGroup;
24 import io.netty.channel.nio.NioEventLoopGroup;
25 import io.netty.util.HashedWheelTimer;
26 import io.netty.util.concurrent.GlobalEventExecutor;
27 import java.io.DataOutputStream;
28 import java.io.InputStream;
29 import java.io.InputStreamReader;
30 import java.net.InetSocketAddress;
31 import java.net.Socket;
32 import java.util.Arrays;
33 import java.util.Collection;
34 import java.util.Collections;
35 import java.util.List;
36 import java.util.Set;
37 import java.util.concurrent.ExecutionException;
38 import java.util.concurrent.ExecutorService;
39 import java.util.concurrent.Executors;
40 import java.util.concurrent.Future;
41 import java.util.concurrent.ThreadFactory;
42 import java.util.concurrent.atomic.AtomicLong;
43 import org.junit.After;
44 import org.junit.AfterClass;
45 import org.junit.Before;
46 import org.junit.BeforeClass;
47 import org.junit.Test;
48 import org.junit.runner.RunWith;
49 import org.junit.runners.Parameterized;
50 import org.opendaylight.controller.config.util.capability.Capability;
51 import org.opendaylight.controller.config.util.xml.DocumentedException;
52 import org.opendaylight.controller.config.util.xml.XmlUtil;
53 import org.opendaylight.netconf.api.NetconfMessage;
54 import org.opendaylight.netconf.api.messages.NetconfHelloMessageAdditionalHeader;
55 import org.opendaylight.netconf.api.monitoring.CapabilityListener;
56 import org.opendaylight.netconf.api.monitoring.NetconfMonitoringService;
57 import org.opendaylight.netconf.api.monitoring.SessionEvent;
58 import org.opendaylight.netconf.api.monitoring.SessionListener;
59 import org.opendaylight.netconf.api.xml.XmlNetconfConstants;
60 import org.opendaylight.netconf.client.NetconfClientDispatcher;
61 import org.opendaylight.netconf.client.NetconfClientDispatcherImpl;
62 import org.opendaylight.netconf.client.SimpleNetconfClientSessionListener;
63 import org.opendaylight.netconf.client.TestingNetconfClient;
64 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
65 import org.opendaylight.netconf.client.conf.NetconfClientConfigurationBuilder;
66 import org.opendaylight.netconf.impl.osgi.AggregatedNetconfOperationServiceFactory;
67 import org.opendaylight.netconf.mapping.api.HandlingPriority;
68 import org.opendaylight.netconf.mapping.api.NetconfOperation;
69 import org.opendaylight.netconf.mapping.api.NetconfOperationChainedExecution;
70 import org.opendaylight.netconf.mapping.api.NetconfOperationService;
71 import org.opendaylight.netconf.mapping.api.NetconfOperationServiceFactory;
72 import org.opendaylight.netconf.nettyutil.handler.exi.NetconfStartExiMessage;
73 import org.opendaylight.netconf.util.messages.NetconfMessageUtil;
74 import org.opendaylight.netconf.util.test.XmlFileLoader;
75 import org.opendaylight.protocol.framework.NeverReconnectStrategy;
76 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
77 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.CapabilitiesBuilder;
78 import org.slf4j.Logger;
79 import org.slf4j.LoggerFactory;
80 import org.w3c.dom.Document;
81
82 @RunWith(Parameterized.class)
83 public class ConcurrentClientsTest {
84     private static final Logger LOG = LoggerFactory.getLogger(ConcurrentClientsTest.class);
85
86     private static ExecutorService clientExecutor;
87
88     private static final int CONCURRENCY = 32;
89     private static final InetSocketAddress NETCONF_ADDRESS = new InetSocketAddress("127.0.0.1", 8303);
90
91     private int nettyThreads;
92     private Class<? extends Runnable> clientRunnable;
93     private Set<String> serverCaps;
94
95     public ConcurrentClientsTest(int nettyThreads, Class<? extends Runnable> clientRunnable, Set<String> serverCaps) {
96         this.nettyThreads = nettyThreads;
97         this.clientRunnable = clientRunnable;
98         this.serverCaps = serverCaps;
99     }
100
101     @Parameterized.Parameters()
102     public static Collection<Object[]> data() {
103         return Arrays.asList(new Object[][]{{4, TestingNetconfClientRunnable.class,
104                 NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES},
105             {1, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES},
106             // empty set of capabilities = only base 1.0 netconf capability
107             {4, TestingNetconfClientRunnable.class, Collections.emptySet()},
108             {4, TestingNetconfClientRunnable.class, getOnlyExiServerCaps()},
109             {4, TestingNetconfClientRunnable.class, getOnlyChunkServerCaps()},
110             {4, BlockingClientRunnable.class, getOnlyExiServerCaps()},
111             {1, BlockingClientRunnable.class, getOnlyExiServerCaps()},
112         });
113     }
114
115     private EventLoopGroup nettyGroup;
116     private NetconfClientDispatcher netconfClientDispatcher;
117
118     HashedWheelTimer hashedWheelTimer;
119     private TestingNetconfOperation testingNetconfOperation;
120
121     public static NetconfMonitoringService createMockedMonitoringService() {
122         NetconfMonitoringService monitoring = mock(NetconfMonitoringService.class);
123         final SessionListener sessionListener = mock(SessionListener.class);
124         doNothing().when(sessionListener).onSessionUp(any(NetconfServerSession.class));
125         doNothing().when(sessionListener).onSessionDown(any(NetconfServerSession.class));
126         doNothing().when(sessionListener).onSessionEvent(any(SessionEvent.class));
127         doReturn(new AutoCloseable() {
128             @Override
129             public void close() throws Exception {
130
131             }
132         }).when(monitoring).registerCapabilitiesListener(any(NetconfMonitoringService.CapabilitiesListener.class));
133         doReturn(sessionListener).when(monitoring).getSessionListener();
134         doReturn(new CapabilitiesBuilder().setCapability(Collections.<Uri>emptyList()).build()).when(monitoring)
135                 .getCapabilities();
136         return monitoring;
137     }
138
139     @BeforeClass
140     public static void setUpClientExecutor() {
141         clientExecutor = Executors.newFixedThreadPool(CONCURRENCY, new ThreadFactory() {
142             int index = 1;
143
144             @Override
145             public Thread newThread(final Runnable runnable) {
146                 Thread thread = new Thread(runnable);
147                 thread.setName("client-" + index++);
148                 thread.setDaemon(true);
149                 return thread;
150             }
151         });
152     }
153
154     @Before
155     public void setUp() throws Exception {
156         hashedWheelTimer = new HashedWheelTimer();
157         nettyGroup = new NioEventLoopGroup(nettyThreads);
158         netconfClientDispatcher = new NetconfClientDispatcherImpl(nettyGroup, nettyGroup, hashedWheelTimer);
159
160         AggregatedNetconfOperationServiceFactory factoriesListener = new AggregatedNetconfOperationServiceFactory();
161
162         testingNetconfOperation = new TestingNetconfOperation();
163         factoriesListener.onAddNetconfOperationServiceFactory(
164                 new TestingOperationServiceFactory(testingNetconfOperation));
165
166         SessionIdProvider idProvider = new SessionIdProvider();
167
168         NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new
169                 NetconfServerSessionNegotiatorFactoryBuilder()
170                 .setTimer(hashedWheelTimer)
171                 .setAggregatedOpService(factoriesListener)
172                 .setIdProvider(idProvider)
173                 .setConnectionTimeoutMillis(5000)
174                 .setMonitoringService(createMockedMonitoringService())
175                 .setBaseCapabilities(serverCaps)
176                 .build();
177
178         NetconfServerDispatcherImpl.ServerChannelInitializer serverChannelInitializer =
179                 new NetconfServerDispatcherImpl.ServerChannelInitializer(serverNegotiatorFactory);
180         final NetconfServerDispatcherImpl dispatch =
181                 new NetconfServerDispatcherImpl(serverChannelInitializer, nettyGroup, nettyGroup);
182
183         ChannelFuture server = dispatch.createServer(NETCONF_ADDRESS);
184         server.await();
185     }
186
187     @After
188     public void tearDown() {
189         hashedWheelTimer.stop();
190         try {
191             nettyGroup.shutdownGracefully().get();
192         } catch (InterruptedException | ExecutionException e) {
193             LOG.warn("Ignoring exception while cleaning up after test", e);
194         }
195     }
196
197     @AfterClass
198     public static void tearDownClientExecutor() {
199         clientExecutor.shutdownNow();
200     }
201
202     @Test(timeout = CONCURRENCY * 1000)
203     public void testConcurrentClients() throws Exception {
204
205         List<Future<?>> futures = Lists.newArrayListWithCapacity(CONCURRENCY);
206
207         for (int i = 0; i < CONCURRENCY; i++) {
208             futures.add(clientExecutor.submit(getInstanceOfClientRunnable()));
209         }
210
211         for (Future<?> future : futures) {
212             try {
213                 future.get();
214             } catch (InterruptedException e) {
215                 throw new IllegalStateException(e);
216             } catch (ExecutionException e) {
217                 LOG.error("Thread for testing client failed", e);
218                 throw e;
219             }
220         }
221
222         assertEquals(CONCURRENCY, testingNetconfOperation.getMessageCount());
223     }
224
225     public static Set<String> getOnlyExiServerCaps() {
226         return Sets.newHashSet(
227                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
228                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0
229         );
230     }
231
232     public static Set<String> getOnlyChunkServerCaps() {
233         return Sets.newHashSet(
234                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
235                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1
236         );
237     }
238
239     public Runnable getInstanceOfClientRunnable() throws Exception {
240         return clientRunnable.getConstructor(ConcurrentClientsTest.class).newInstance(this);
241     }
242
243     /**
244      * Responds to all operations except start-exi and counts all requests.
245      */
246     private static class TestingNetconfOperation implements NetconfOperation {
247
248         private final AtomicLong counter = new AtomicLong();
249
250         @Override
251         public HandlingPriority canHandle(Document message) {
252             return XmlUtil.toString(message).contains(NetconfStartExiMessage.START_EXI)
253                     ? HandlingPriority.CANNOT_HANDLE :
254                     HandlingPriority.HANDLE_WITH_MAX_PRIORITY;
255         }
256
257         @SuppressWarnings("checkstyle:IllegalCatch")
258         @Override
259         public Document handle(Document requestMessage, NetconfOperationChainedExecution subsequentOperation)
260                 throws DocumentedException {
261             try {
262                 LOG.info("Handling netconf message from test {}", XmlUtil.toString(requestMessage));
263                 counter.getAndIncrement();
264                 return XmlUtil.readXmlToDocument("<test/>");
265             } catch (Exception e) {
266                 throw new RuntimeException(e);
267             }
268         }
269
270         public long getMessageCount() {
271             return counter.get();
272         }
273     }
274
275     /**
276      * Hardcoded operation service factory.
277      */
278     private static class TestingOperationServiceFactory implements NetconfOperationServiceFactory {
279         private final NetconfOperation[] operations;
280
281         TestingOperationServiceFactory(final NetconfOperation... operations) {
282             this.operations = operations;
283         }
284
285         @Override
286         public Set<Capability> getCapabilities() {
287             return Collections.emptySet();
288         }
289
290         @Override
291         public AutoCloseable registerCapabilityListener(final CapabilityListener listener) {
292             return new AutoCloseable() {
293                 @Override
294                 public void close() throws Exception {
295                 }
296             };
297         }
298
299         @Override
300         public NetconfOperationService createService(String netconfSessionIdForReporting) {
301             return new NetconfOperationService() {
302
303                 @Override
304                 public Set<NetconfOperation> getNetconfOperations() {
305                     return Sets.newHashSet(operations);
306                 }
307
308                 @Override
309                 public void close() {
310                 }
311             };
312         }
313     }
314
315     /**
316      * Pure socket based blocking client.
317      */
318     @SuppressWarnings("checkstyle:IllegalCatch")
319     public final class BlockingClientRunnable implements Runnable {
320
321         @Override
322         public void run() {
323             try {
324                 run2();
325             } catch (Exception e) {
326                 throw new IllegalStateException(Thread.currentThread().getName(), e);
327             }
328         }
329
330         private void run2() throws Exception {
331             InputStream clientHello = checkNotNull(XmlFileLoader
332                     .getResourceAsStream("netconfMessages/client_hello.xml"));
333             final InputStream getConfig =
334                     checkNotNull(XmlFileLoader.getResourceAsStream("netconfMessages/getConfig.xml"));
335
336             Socket clientSocket = new Socket(NETCONF_ADDRESS.getHostString(), NETCONF_ADDRESS.getPort());
337             DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
338             InputStreamReader inFromServer = new InputStreamReader(clientSocket.getInputStream());
339
340             StringBuffer sb = new StringBuffer();
341             while (sb.toString().endsWith("]]>]]>") == false) {
342                 sb.append((char) inFromServer.read());
343             }
344             LOG.info(sb.toString());
345
346             outToServer.write(ByteStreams.toByteArray(clientHello));
347             outToServer.write("]]>]]>".getBytes());
348             outToServer.flush();
349             // Thread.sleep(100);
350             outToServer.write(ByteStreams.toByteArray(getConfig));
351             outToServer.write("]]>]]>".getBytes());
352             outToServer.flush();
353             Thread.sleep(100);
354             sb = new StringBuffer();
355             while (sb.toString().endsWith("]]>]]>") == false) {
356                 sb.append((char) inFromServer.read());
357             }
358             LOG.info(sb.toString());
359             clientSocket.close();
360         }
361     }
362
363     /**
364      * TestingNetconfClient based runnable.
365      */
366     public final class TestingNetconfClientRunnable implements Runnable {
367
368         @SuppressWarnings("checkstyle:IllegalCatch")
369         @Override
370         public void run() {
371             try {
372                 final TestingNetconfClient netconfClient =
373                         new TestingNetconfClient(Thread.currentThread().getName(), netconfClientDispatcher,
374                                 getClientConfig());
375                 long sessionId = netconfClient.getSessionId();
376                 LOG.info("Client with session id {}: hello exchanged", sessionId);
377
378                 final NetconfMessage getMessage = XmlFileLoader
379                         .xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
380                 NetconfMessage result = netconfClient.sendRequest(getMessage).get();
381                 LOG.info("Client with session id {}: got result {}", sessionId, result);
382
383                 Preconditions.checkState(NetconfMessageUtil.isErrorMessage(result) == false,
384                         "Received error response: " + XmlUtil.toString(result.getDocument()) + " to request: "
385                                 + XmlUtil.toString(getMessage.getDocument()));
386
387                 netconfClient.close();
388                 LOG.info("Client with session id {}: ended", sessionId);
389             } catch (final Exception e) {
390                 throw new IllegalStateException(Thread.currentThread().getName(), e);
391             }
392         }
393
394         private NetconfClientConfiguration getClientConfig() {
395             final NetconfClientConfigurationBuilder b = NetconfClientConfigurationBuilder.create();
396             b.withAddress(NETCONF_ADDRESS);
397             b.withAdditionalHeader(new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp",
398                     "client"));
399             b.withSessionListener(new SimpleNetconfClientSessionListener());
400             b.withReconnectStrategy(new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE,
401                     NetconfClientConfigurationBuilder.DEFAULT_CONNECTION_TIMEOUT_MILLIS));
402             return b.build();
403         }
404     }
405 }