Upgrade ietf-{inet,yang}-types to 2013-07-15
[netconf.git] / netconf / netconf-impl / src / test / java / org / opendaylight / netconf / impl / ConcurrentClientsTest.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.impl;
10
11 import static com.google.common.base.Preconditions.checkNotNull;
12 import static org.junit.Assert.assertEquals;
13 import static org.junit.Assert.fail;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Mockito.doNothing;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.mock;
18
19 import com.google.common.base.Preconditions;
20 import com.google.common.collect.Lists;
21 import com.google.common.collect.Sets;
22 import com.google.common.io.ByteStreams;
23 import io.netty.channel.ChannelFuture;
24 import io.netty.channel.EventLoopGroup;
25 import io.netty.channel.nio.NioEventLoopGroup;
26 import io.netty.util.HashedWheelTimer;
27 import io.netty.util.concurrent.GlobalEventExecutor;
28 import java.io.DataOutputStream;
29 import java.io.InputStream;
30 import java.io.InputStreamReader;
31 import java.net.InetSocketAddress;
32 import java.net.Socket;
33 import java.util.Arrays;
34 import java.util.Collection;
35 import java.util.Collections;
36 import java.util.List;
37 import java.util.Set;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.ExecutorService;
40 import java.util.concurrent.Executors;
41 import java.util.concurrent.Future;
42 import java.util.concurrent.ThreadFactory;
43 import java.util.concurrent.atomic.AtomicLong;
44 import org.junit.After;
45 import org.junit.AfterClass;
46 import org.junit.Before;
47 import org.junit.BeforeClass;
48 import org.junit.Test;
49 import org.junit.runner.RunWith;
50 import org.junit.runners.Parameterized;
51 import org.opendaylight.controller.config.util.capability.Capability;
52 import org.opendaylight.controller.config.util.xml.DocumentedException;
53 import org.opendaylight.controller.config.util.xml.XmlUtil;
54 import org.opendaylight.netconf.api.monitoring.SessionEvent;
55 import org.opendaylight.netconf.api.monitoring.SessionListener;
56 import org.opendaylight.netconf.api.NetconfMessage;
57 import org.opendaylight.netconf.api.messages.NetconfHelloMessageAdditionalHeader;
58 import org.opendaylight.netconf.api.monitoring.CapabilityListener;
59 import org.opendaylight.netconf.api.monitoring.NetconfMonitoringService;
60 import org.opendaylight.netconf.api.xml.XmlNetconfConstants;
61 import org.opendaylight.netconf.client.NetconfClientDispatcher;
62 import org.opendaylight.netconf.client.NetconfClientDispatcherImpl;
63 import org.opendaylight.netconf.client.SimpleNetconfClientSessionListener;
64 import org.opendaylight.netconf.client.TestingNetconfClient;
65 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
66 import org.opendaylight.netconf.client.conf.NetconfClientConfigurationBuilder;
67 import org.opendaylight.netconf.impl.osgi.AggregatedNetconfOperationServiceFactory;
68 import org.opendaylight.netconf.mapping.api.HandlingPriority;
69 import org.opendaylight.netconf.mapping.api.NetconfOperation;
70 import org.opendaylight.netconf.mapping.api.NetconfOperationChainedExecution;
71 import org.opendaylight.netconf.mapping.api.NetconfOperationService;
72 import org.opendaylight.netconf.mapping.api.NetconfOperationServiceFactory;
73 import org.opendaylight.netconf.nettyutil.handler.exi.NetconfStartExiMessage;
74 import org.opendaylight.netconf.util.messages.NetconfMessageUtil;
75 import org.opendaylight.netconf.util.test.XmlFileLoader;
76 import org.opendaylight.protocol.framework.NeverReconnectStrategy;
77 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
78 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.CapabilitiesBuilder;
79 import org.slf4j.Logger;
80 import org.slf4j.LoggerFactory;
81 import org.w3c.dom.Document;
82
83 @RunWith(Parameterized.class)
84 public class ConcurrentClientsTest {
85     private static final Logger LOG = LoggerFactory.getLogger(ConcurrentClientsTest.class);
86
87     private static ExecutorService clientExecutor;
88
89     private static final int CONCURRENCY = 32;
90     private static final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303);
91
92     private int nettyThreads;
93     private Class<? extends Runnable> clientRunnable;
94     private Set<String> serverCaps;
95
96     public ConcurrentClientsTest(int nettyThreads, Class<? extends Runnable> clientRunnable, Set<String> serverCaps) {
97         this.nettyThreads = nettyThreads;
98         this.clientRunnable = clientRunnable;
99         this.serverCaps = serverCaps;
100     }
101
102     @Parameterized.Parameters()
103     public static Collection<Object[]> data() {
104         return Arrays.asList(new Object[][]{{4, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES},
105                                             {1, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES},
106                                             // empty set of capabilities = only base 1.0 netconf capability
107                                             {4, TestingNetconfClientRunnable.class, Collections.emptySet()},
108                                             {4, TestingNetconfClientRunnable.class, getOnlyExiServerCaps()},
109                                             {4, TestingNetconfClientRunnable.class, getOnlyChunkServerCaps()},
110                                             {4, BlockingClientRunnable.class, getOnlyExiServerCaps()},
111                                             {1, BlockingClientRunnable.class, getOnlyExiServerCaps()},
112         });
113     }
114
115     private EventLoopGroup nettyGroup;
116     private NetconfClientDispatcher netconfClientDispatcher;
117
118     HashedWheelTimer hashedWheelTimer;
119     private TestingNetconfOperation testingNetconfOperation;
120
121     public static NetconfMonitoringService createMockedMonitoringService() {
122         NetconfMonitoringService monitoring = mock(NetconfMonitoringService.class);
123         final SessionListener sessionListener = mock(SessionListener.class);
124         doNothing().when(sessionListener).onSessionUp(any(NetconfServerSession.class));
125         doNothing().when(sessionListener).onSessionDown(any(NetconfServerSession.class));
126         doNothing().when(sessionListener).onSessionEvent(any(SessionEvent.class));
127         doReturn(new AutoCloseable() {
128             @Override
129             public void close() throws Exception {
130
131             }
132         }).when(monitoring).registerCapabilitiesListener(any(NetconfMonitoringService.CapabilitiesListener.class));
133         doReturn(sessionListener).when(monitoring).getSessionListener();
134         doReturn(new CapabilitiesBuilder().setCapability(Collections.<Uri>emptyList()).build()).when(monitoring).getCapabilities();
135         return monitoring;
136     }
137
138     @BeforeClass
139     public static void setUpClientExecutor() {
140         clientExecutor = Executors.newFixedThreadPool(CONCURRENCY, new ThreadFactory() {
141             int i = 1;
142
143             @Override
144             public Thread newThread(final Runnable r) {
145                 Thread thread = new Thread(r);
146                 thread.setName("client-" + i++);
147                 thread.setDaemon(true);
148                 return thread;
149             }
150         });
151     }
152
153     @Before
154     public void setUp() throws Exception {
155         hashedWheelTimer = new HashedWheelTimer();
156         nettyGroup = new NioEventLoopGroup(nettyThreads);
157         netconfClientDispatcher = new NetconfClientDispatcherImpl(nettyGroup, nettyGroup, hashedWheelTimer);
158
159         AggregatedNetconfOperationServiceFactory factoriesListener = new AggregatedNetconfOperationServiceFactory();
160
161         testingNetconfOperation = new TestingNetconfOperation();
162         factoriesListener.onAddNetconfOperationServiceFactory(new TestingOperationServiceFactory(testingNetconfOperation));
163
164         SessionIdProvider idProvider = new SessionIdProvider();
165
166         NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactoryBuilder()
167                 .setTimer(hashedWheelTimer)
168                 .setAggregatedOpService(factoriesListener)
169                 .setIdProvider(idProvider)
170                 .setConnectionTimeoutMillis(5000)
171                 .setMonitoringService(createMockedMonitoringService())
172                 .setBaseCapabilities(serverCaps)
173                 .build();
174
175         NetconfServerDispatcherImpl.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcherImpl.ServerChannelInitializer(serverNegotiatorFactory);
176         final NetconfServerDispatcherImpl dispatch = new NetconfServerDispatcherImpl(serverChannelInitializer, nettyGroup, nettyGroup);
177
178         ChannelFuture s = dispatch.createServer(netconfAddress);
179         s.await();
180     }
181
182     @After
183     public void tearDown(){
184         hashedWheelTimer.stop();
185         try {
186             nettyGroup.shutdownGracefully().get();
187         } catch (InterruptedException | ExecutionException e) {
188             LOG.warn("Ignoring exception while cleaning up after test", e);
189         }
190     }
191
192     @AfterClass
193     public static void tearDownClientExecutor() {
194         clientExecutor.shutdownNow();
195     }
196
197     @Test(timeout = CONCURRENCY * 1000)
198     public void testConcurrentClients() throws Exception {
199
200         List<Future<?>> futures = Lists.newArrayListWithCapacity(CONCURRENCY);
201
202         for (int i = 0; i < CONCURRENCY; i++) {
203             futures.add(clientExecutor.submit(getInstanceOfClientRunnable()));
204         }
205
206         for (Future<?> future : futures) {
207             try {
208                 future.get();
209             } catch (InterruptedException e) {
210                 throw new IllegalStateException(e);
211             } catch (ExecutionException e) {
212                 LOG.error("Thread for testing client failed", e);
213                 throw e;
214             }
215         }
216
217         assertEquals(CONCURRENCY, testingNetconfOperation.getMessageCount());
218     }
219
220     public static Set<String> getOnlyExiServerCaps() {
221         return Sets.newHashSet(
222                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
223                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0
224         );
225     }
226
227     public static Set<String> getOnlyChunkServerCaps() {
228         return Sets.newHashSet(
229                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
230                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1
231         );
232     }
233
234     public Runnable getInstanceOfClientRunnable() throws Exception {
235         return clientRunnable.getConstructor(ConcurrentClientsTest.class).newInstance(this);
236     }
237
238     /**
239      * Responds to all operations except start-exi and counts all requests
240      */
241     private static class TestingNetconfOperation implements NetconfOperation {
242
243         private final AtomicLong counter = new AtomicLong();
244
245         @Override
246         public HandlingPriority canHandle(Document message) {
247             return XmlUtil.toString(message).contains(NetconfStartExiMessage.START_EXI) ?
248                     HandlingPriority.CANNOT_HANDLE :
249                     HandlingPriority.HANDLE_WITH_MAX_PRIORITY;
250         }
251
252         @Override
253         public Document handle(Document requestMessage, NetconfOperationChainedExecution subsequentOperation) throws DocumentedException {
254             try {
255                 LOG.info("Handling netconf message from test {}", XmlUtil.toString(requestMessage));
256                 counter.getAndIncrement();
257                 return XmlUtil.readXmlToDocument("<test/>");
258             } catch (Exception e) {
259                 throw new RuntimeException(e);
260             }
261         }
262
263         public long getMessageCount() {
264             return counter.get();
265         }
266     }
267
268     /**
269      * Hardcoded operation service factory
270      */
271     private static class TestingOperationServiceFactory implements NetconfOperationServiceFactory {
272         private final NetconfOperation[] operations;
273
274         public TestingOperationServiceFactory(final NetconfOperation... operations) {
275             this.operations = operations;
276         }
277
278         @Override
279         public Set<Capability> getCapabilities() {
280             return Collections.emptySet();
281         }
282
283         @Override
284         public AutoCloseable registerCapabilityListener(final CapabilityListener listener) {
285             return new AutoCloseable(){
286                 @Override
287                 public void close() throws Exception {}
288             };
289         }
290
291         @Override
292         public NetconfOperationService createService(String netconfSessionIdForReporting) {
293             return new NetconfOperationService() {
294
295                 @Override
296                 public Set<NetconfOperation> getNetconfOperations() {
297                     return Sets.newHashSet(operations);
298                 }
299
300                 @Override
301                 public void close() {}
302             };
303         }
304     }
305
306     /**
307      * Pure socket based blocking client
308      */
309     public final class BlockingClientRunnable implements Runnable {
310
311         @Override
312         public void run() {
313             try {
314                 run2();
315             } catch (Exception e) {
316                 throw new IllegalStateException(Thread.currentThread().getName(), e);
317             }
318         }
319
320         private void run2() throws Exception {
321             InputStream clientHello = checkNotNull(XmlFileLoader
322                     .getResourceAsStream("netconfMessages/client_hello.xml"));
323             InputStream getConfig = checkNotNull(XmlFileLoader.getResourceAsStream("netconfMessages/getConfig.xml"));
324
325             Socket clientSocket = new Socket(netconfAddress.getHostString(), netconfAddress.getPort());
326             DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
327             InputStreamReader inFromServer = new InputStreamReader(clientSocket.getInputStream());
328
329             StringBuffer sb = new StringBuffer();
330             while (sb.toString().endsWith("]]>]]>") == false) {
331                 sb.append((char) inFromServer.read());
332             }
333             LOG.info(sb.toString());
334
335             outToServer.write(ByteStreams.toByteArray(clientHello));
336             outToServer.write("]]>]]>".getBytes());
337             outToServer.flush();
338             // Thread.sleep(100);
339             outToServer.write(ByteStreams.toByteArray(getConfig));
340             outToServer.write("]]>]]>".getBytes());
341             outToServer.flush();
342             Thread.sleep(100);
343             sb = new StringBuffer();
344             while (sb.toString().endsWith("]]>]]>") == false) {
345                 sb.append((char) inFromServer.read());
346             }
347             LOG.info(sb.toString());
348             clientSocket.close();
349         }
350     }
351
352     /**
353      * TestingNetconfClient based runnable
354      */
355     public final class TestingNetconfClientRunnable implements Runnable {
356
357         @Override
358         public void run() {
359             try {
360                 final TestingNetconfClient netconfClient =
361                         new TestingNetconfClient(Thread.currentThread().getName(), netconfClientDispatcher, getClientConfig());
362                 long sessionId = netconfClient.getSessionId();
363                 LOG.info("Client with session id {}: hello exchanged", sessionId);
364
365                 final NetconfMessage getMessage = XmlFileLoader
366                         .xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
367                 NetconfMessage result = netconfClient.sendRequest(getMessage).get();
368                 LOG.info("Client with session id {}: got result {}", sessionId, result);
369
370                 Preconditions.checkState(NetconfMessageUtil.isErrorMessage(result) == false,
371                         "Received error response: " + XmlUtil.toString(result.getDocument()) + " to request: "
372                                 + XmlUtil.toString(getMessage.getDocument()));
373
374                 netconfClient.close();
375                 LOG.info("Client with session id {}: ended", sessionId);
376             } catch (final Exception e) {
377                 throw new IllegalStateException(Thread.currentThread().getName(), e);
378             }
379         }
380
381         private NetconfClientConfiguration getClientConfig() {
382             final NetconfClientConfigurationBuilder b = NetconfClientConfigurationBuilder.create();
383             b.withAddress(netconfAddress);
384             b.withAdditionalHeader(new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp",
385                     "client"));
386             b.withSessionListener(new SimpleNetconfClientSessionListener());
387             b.withReconnectStrategy(new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE,
388                     NetconfClientConfigurationBuilder.DEFAULT_CONNECTION_TIMEOUT_MILLIS));
389             return b.build();
390         }
391     }
392 }