BUG-2635 Prepare netconf monitoring service for md-sal monitoring.
[controller.git] / opendaylight / netconf / netconf-impl / src / test / java / org / opendaylight / controller / netconf / impl / ConcurrentClientsTest.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.impl;
10
11 import static com.google.common.base.Preconditions.checkNotNull;
12 import static org.junit.Assert.assertEquals;
13 import static org.junit.Assert.fail;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Mockito.doNothing;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.mock;
18
19 import com.google.common.base.Preconditions;
20 import com.google.common.collect.Lists;
21 import com.google.common.collect.Sets;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.EventLoopGroup;
24 import io.netty.channel.nio.NioEventLoopGroup;
25 import io.netty.util.HashedWheelTimer;
26 import io.netty.util.concurrent.GlobalEventExecutor;
27 import java.io.DataOutputStream;
28 import java.io.InputStream;
29 import java.io.InputStreamReader;
30 import java.lang.management.ManagementFactory;
31 import java.net.InetSocketAddress;
32 import java.net.Socket;
33 import java.util.Arrays;
34 import java.util.Collection;
35 import java.util.Collections;
36 import java.util.List;
37 import java.util.Set;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.ExecutorService;
40 import java.util.concurrent.Executors;
41 import java.util.concurrent.Future;
42 import java.util.concurrent.ThreadFactory;
43 import java.util.concurrent.atomic.AtomicLong;
44 import org.apache.commons.io.IOUtils;
45 import org.junit.After;
46 import org.junit.AfterClass;
47 import org.junit.Before;
48 import org.junit.BeforeClass;
49 import org.junit.Test;
50 import org.junit.runner.RunWith;
51 import org.junit.runners.Parameterized;
52 import org.opendaylight.controller.netconf.api.Capability;
53 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
54 import org.opendaylight.controller.netconf.api.NetconfMessage;
55 import org.opendaylight.controller.netconf.api.monitoring.CapabilityListener;
56 import org.opendaylight.controller.netconf.api.monitoring.NetconfMonitoringService;
57 import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
58 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
59 import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl;
60 import org.opendaylight.controller.netconf.client.SimpleNetconfClientSessionListener;
61 import org.opendaylight.controller.netconf.client.TestingNetconfClient;
62 import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
63 import org.opendaylight.controller.netconf.client.conf.NetconfClientConfigurationBuilder;
64 import org.opendaylight.controller.netconf.impl.osgi.AggregatedNetconfOperationServiceFactory;
65 import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
66 import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
67 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
68 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
69 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
70 import org.opendaylight.controller.netconf.nettyutil.handler.exi.NetconfStartExiMessage;
71 import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
72 import org.opendaylight.controller.netconf.util.messages.NetconfMessageUtil;
73 import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
74 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
75 import org.opendaylight.protocol.framework.NeverReconnectStrategy;
76 import org.slf4j.Logger;
77 import org.slf4j.LoggerFactory;
78 import org.w3c.dom.Document;
79
80 @RunWith(Parameterized.class)
81 public class ConcurrentClientsTest {
82     private static final Logger LOG = LoggerFactory.getLogger(ConcurrentClientsTest.class);
83
84     private static ExecutorService clientExecutor;
85
86     private static final int CONCURRENCY = 32;
87     private static final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303);
88
89     private int nettyThreads;
90     private Class<? extends Runnable> clientRunnable;
91     private Set<String> serverCaps;
92
93     public ConcurrentClientsTest(int nettyThreads, Class<? extends Runnable> clientRunnable, Set<String> serverCaps) {
94         this.nettyThreads = nettyThreads;
95         this.clientRunnable = clientRunnable;
96         this.serverCaps = serverCaps;
97     }
98
99     @Parameterized.Parameters()
100     public static Collection<Object[]> data() {
101         return Arrays.asList(new Object[][]{{4, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES},
102                                             {1, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES},
103                                             // empty set of capabilities = only base 1.0 netconf capability
104                                             {4, TestingNetconfClientRunnable.class, Collections.emptySet()},
105                                             {4, TestingNetconfClientRunnable.class, getOnlyExiServerCaps()},
106                                             {4, TestingNetconfClientRunnable.class, getOnlyChunkServerCaps()},
107                                             {4, BlockingClientRunnable.class, getOnlyExiServerCaps()},
108                                             {1, BlockingClientRunnable.class, getOnlyExiServerCaps()},
109         });
110     }
111
112     private EventLoopGroup nettyGroup;
113     private NetconfClientDispatcher netconfClientDispatcher;
114
115     private DefaultCommitNotificationProducer commitNot;
116
117     HashedWheelTimer hashedWheelTimer;
118     private TestingNetconfOperation testingNetconfOperation;
119
120     public static NetconfMonitoringService createMockedMonitoringService() {
121         NetconfMonitoringService monitoring = mock(NetconfMonitoringService.class);
122         doNothing().when(monitoring).onSessionUp(any(NetconfServerSession.class));
123         doNothing().when(monitoring).onSessionDown(any(NetconfServerSession.class));
124         doReturn(Collections.emptySet()).when(monitoring).getCapabilities();
125         return monitoring;
126     }
127
128     @BeforeClass
129     public static void setUpClientExecutor() {
130         clientExecutor = Executors.newFixedThreadPool(CONCURRENCY, new ThreadFactory() {
131             int i = 1;
132
133             @Override
134             public Thread newThread(final Runnable r) {
135                 Thread thread = new Thread(r);
136                 thread.setName("client-" + i++);
137                 thread.setDaemon(true);
138                 return thread;
139             }
140         });
141     }
142
143     @Before
144     public void setUp() throws Exception {
145         hashedWheelTimer = new HashedWheelTimer();
146         nettyGroup = new NioEventLoopGroup(nettyThreads);
147         netconfClientDispatcher = new NetconfClientDispatcherImpl(nettyGroup, nettyGroup, hashedWheelTimer);
148
149         AggregatedNetconfOperationServiceFactory factoriesListener = new AggregatedNetconfOperationServiceFactory();
150
151         testingNetconfOperation = new TestingNetconfOperation();
152         factoriesListener.onAddNetconfOperationServiceFactory(new TestingOperationServiceFactory(testingNetconfOperation));
153
154         SessionIdProvider idProvider = new SessionIdProvider();
155
156         NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory(
157                 hashedWheelTimer, factoriesListener, idProvider, 5000, commitNot, createMockedMonitoringService(), serverCaps);
158
159         commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
160
161         NetconfServerDispatcherImpl.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcherImpl.ServerChannelInitializer(serverNegotiatorFactory);
162         final NetconfServerDispatcherImpl dispatch = new NetconfServerDispatcherImpl(serverChannelInitializer, nettyGroup, nettyGroup);
163
164         ChannelFuture s = dispatch.createServer(netconfAddress);
165         s.await();
166     }
167
168     @After
169     public void tearDown(){
170         commitNot.close();
171         hashedWheelTimer.stop();
172         try {
173             nettyGroup.shutdownGracefully().get();
174         } catch (InterruptedException | ExecutionException e) {
175             LOG.warn("Ignoring exception while cleaning up after test", e);
176         }
177     }
178
179     @AfterClass
180     public static void tearDownClientExecutor() {
181         clientExecutor.shutdownNow();
182     }
183
184     @Test(timeout = CONCURRENCY * 1000)
185     public void testConcurrentClients() throws Exception {
186
187         List<Future<?>> futures = Lists.newArrayListWithCapacity(CONCURRENCY);
188
189         for (int i = 0; i < CONCURRENCY; i++) {
190             futures.add(clientExecutor.submit(getInstanceOfClientRunnable()));
191         }
192
193         for (Future<?> future : futures) {
194             try {
195                 future.get();
196             } catch (InterruptedException e) {
197                 throw new IllegalStateException(e);
198             } catch (ExecutionException e) {
199                 LOG.error("Thread for testing client failed", e);
200                 fail("Client failed: " + e.getMessage());
201             }
202         }
203
204         assertEquals(CONCURRENCY, testingNetconfOperation.getMessageCount());
205     }
206
207     public static Set<String> getOnlyExiServerCaps() {
208         return Sets.newHashSet(
209                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
210                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0
211         );
212     }
213
214     public static Set<String> getOnlyChunkServerCaps() {
215         return Sets.newHashSet(
216                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
217                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1
218         );
219     }
220
221     public Runnable getInstanceOfClientRunnable() throws Exception {
222         return clientRunnable.getConstructor(ConcurrentClientsTest.class).newInstance(this);
223     }
224
225     /**
226      * Responds to all operations except start-exi and counts all requests
227      */
228     private static class TestingNetconfOperation implements NetconfOperation {
229
230         private final AtomicLong counter = new AtomicLong();
231
232         @Override
233         public HandlingPriority canHandle(Document message) {
234             return XmlUtil.toString(message).contains(NetconfStartExiMessage.START_EXI) ?
235                     HandlingPriority.CANNOT_HANDLE :
236                     HandlingPriority.HANDLE_WITH_MAX_PRIORITY;
237         }
238
239         @Override
240         public Document handle(Document requestMessage, NetconfOperationChainedExecution subsequentOperation) throws NetconfDocumentedException {
241             try {
242                 LOG.info("Handling netconf message from test {}", XmlUtil.toString(requestMessage));
243                 counter.getAndIncrement();
244                 return XmlUtil.readXmlToDocument("<test/>");
245             } catch (Exception e) {
246                 throw new RuntimeException(e);
247             }
248         }
249
250         public long getMessageCount() {
251             return counter.get();
252         }
253     }
254
255     /**
256      * Hardcoded operation service factory
257      */
258     private static class TestingOperationServiceFactory implements NetconfOperationServiceFactory {
259         private final NetconfOperation[] operations;
260
261         public TestingOperationServiceFactory(final NetconfOperation... operations) {
262             this.operations = operations;
263         }
264
265         @Override
266         public Set<Capability> getCapabilities() {
267             return Collections.emptySet();
268         }
269
270         @Override
271         public AutoCloseable registerCapabilityListener(final CapabilityListener listener) {
272             return new AutoCloseable(){
273                 @Override
274                 public void close() throws Exception {}
275             };
276         }
277
278         @Override
279         public NetconfOperationService createService(String netconfSessionIdForReporting) {
280             return new NetconfOperationService() {
281
282                 @Override
283                 public Set<NetconfOperation> getNetconfOperations() {
284                     return Sets.newHashSet(operations);
285                 }
286
287                 @Override
288                 public void close() {}
289             };
290         }
291     }
292
293     /**
294      * Pure socket based blocking client
295      */
296     public final class BlockingClientRunnable implements Runnable {
297
298         @Override
299         public void run() {
300             try {
301                 run2();
302             } catch (Exception e) {
303                 throw new IllegalStateException(Thread.currentThread().getName(), e);
304             }
305         }
306
307         private void run2() throws Exception {
308             InputStream clientHello = checkNotNull(XmlFileLoader
309                     .getResourceAsStream("netconfMessages/client_hello.xml"));
310             InputStream getConfig = checkNotNull(XmlFileLoader.getResourceAsStream("netconfMessages/getConfig.xml"));
311
312             Socket clientSocket = new Socket(netconfAddress.getHostString(), netconfAddress.getPort());
313             DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
314             InputStreamReader inFromServer = new InputStreamReader(clientSocket.getInputStream());
315
316             StringBuffer sb = new StringBuffer();
317             while (sb.toString().endsWith("]]>]]>") == false) {
318                 sb.append((char) inFromServer.read());
319             }
320             LOG.info(sb.toString());
321
322             outToServer.write(IOUtils.toByteArray(clientHello));
323             outToServer.write("]]>]]>".getBytes());
324             outToServer.flush();
325             // Thread.sleep(100);
326             outToServer.write(IOUtils.toByteArray(getConfig));
327             outToServer.write("]]>]]>".getBytes());
328             outToServer.flush();
329             Thread.sleep(100);
330             sb = new StringBuffer();
331             while (sb.toString().endsWith("]]>]]>") == false) {
332                 sb.append((char) inFromServer.read());
333             }
334             LOG.info(sb.toString());
335             clientSocket.close();
336         }
337     }
338
339     /**
340      * TestingNetconfClient based runnable
341      */
342     public final class TestingNetconfClientRunnable implements Runnable {
343
344         @Override
345         public void run() {
346             try {
347                 final TestingNetconfClient netconfClient =
348                         new TestingNetconfClient(Thread.currentThread().getName(), netconfClientDispatcher, getClientConfig());
349                 long sessionId = netconfClient.getSessionId();
350                 LOG.info("Client with session id {}: hello exchanged", sessionId);
351
352                 final NetconfMessage getMessage = XmlFileLoader
353                         .xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
354                 NetconfMessage result = netconfClient.sendRequest(getMessage).get();
355                 LOG.info("Client with session id {}: got result {}", sessionId, result);
356
357                 Preconditions.checkState(NetconfMessageUtil.isErrorMessage(result) == false,
358                         "Received error response: " + XmlUtil.toString(result.getDocument()) + " to request: "
359                                 + XmlUtil.toString(getMessage.getDocument()));
360
361                 netconfClient.close();
362                 LOG.info("Client with session id {}: ended", sessionId);
363             } catch (final Exception e) {
364                 throw new IllegalStateException(Thread.currentThread().getName(), e);
365             }
366         }
367
368         private NetconfClientConfiguration getClientConfig() {
369             final NetconfClientConfigurationBuilder b = NetconfClientConfigurationBuilder.create();
370             b.withAddress(netconfAddress);
371             b.withAdditionalHeader(new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp",
372                     "client"));
373             b.withSessionListener(new SimpleNetconfClientSessionListener());
374             b.withReconnectStrategy(new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE,
375                     NetconfClientConfigurationBuilder.DEFAULT_CONNECTION_TIMEOUT_MILLIS));
376             return b.build();
377         }
378     }
379 }