Rework NETCONF client reconnection
[netconf.git] / protocol / netconf-server / src / test / java / org / opendaylight / netconf / server / ConcurrentClientsTest.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.server;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
12 import static org.junit.Assert.assertEquals;
13 import static org.mockito.ArgumentMatchers.any;
14 import static org.mockito.Mockito.doNothing;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.mock;
17
18 import com.google.common.io.ByteStreams;
19 import io.netty.channel.ChannelFuture;
20 import io.netty.channel.EventLoopGroup;
21 import io.netty.channel.nio.NioEventLoopGroup;
22 import io.netty.util.HashedWheelTimer;
23 import java.io.DataOutputStream;
24 import java.io.InputStream;
25 import java.io.InputStreamReader;
26 import java.net.InetSocketAddress;
27 import java.net.Socket;
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.List;
31 import java.util.Set;
32 import java.util.concurrent.ExecutionException;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.Future;
36 import java.util.concurrent.ThreadFactory;
37 import java.util.concurrent.atomic.AtomicLong;
38 import org.junit.After;
39 import org.junit.AfterClass;
40 import org.junit.Before;
41 import org.junit.BeforeClass;
42 import org.junit.Test;
43 import org.junit.runner.RunWith;
44 import org.junit.runners.Parameterized;
45 import org.opendaylight.netconf.api.CapabilityURN;
46 import org.opendaylight.netconf.api.DocumentedException;
47 import org.opendaylight.netconf.api.NetconfMessage;
48 import org.opendaylight.netconf.api.messages.NetconfHelloMessageAdditionalHeader;
49 import org.opendaylight.netconf.api.xml.XmlUtil;
50 import org.opendaylight.netconf.client.NetconfClientDispatcher;
51 import org.opendaylight.netconf.client.NetconfClientDispatcherImpl;
52 import org.opendaylight.netconf.client.NetconfMessageUtil;
53 import org.opendaylight.netconf.client.SimpleNetconfClientSessionListener;
54 import org.opendaylight.netconf.client.TestingNetconfClient;
55 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
56 import org.opendaylight.netconf.client.conf.NetconfClientConfigurationBuilder;
57 import org.opendaylight.netconf.nettyutil.handler.exi.NetconfStartExiMessage;
58 import org.opendaylight.netconf.server.api.SessionIdProvider;
59 import org.opendaylight.netconf.server.api.monitoring.Capability;
60 import org.opendaylight.netconf.server.api.monitoring.CapabilityListener;
61 import org.opendaylight.netconf.server.api.monitoring.NetconfMonitoringService;
62 import org.opendaylight.netconf.server.api.monitoring.SessionEvent;
63 import org.opendaylight.netconf.server.api.monitoring.SessionListener;
64 import org.opendaylight.netconf.server.api.operations.HandlingPriority;
65 import org.opendaylight.netconf.server.api.operations.NetconfOperation;
66 import org.opendaylight.netconf.server.api.operations.NetconfOperationChainedExecution;
67 import org.opendaylight.netconf.server.api.operations.NetconfOperationService;
68 import org.opendaylight.netconf.server.api.operations.NetconfOperationServiceFactory;
69 import org.opendaylight.netconf.server.impl.DefaultSessionIdProvider;
70 import org.opendaylight.netconf.server.osgi.AggregatedNetconfOperationServiceFactory;
71 import org.opendaylight.netconf.test.util.XmlFileLoader;
72 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.SessionIdType;
73 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.CapabilitiesBuilder;
74 import org.opendaylight.yangtools.concepts.Registration;
75 import org.slf4j.Logger;
76 import org.slf4j.LoggerFactory;
77 import org.w3c.dom.Document;
78
79 @RunWith(Parameterized.class)
80 public class ConcurrentClientsTest {
81     private static final Logger LOG = LoggerFactory.getLogger(ConcurrentClientsTest.class);
82
83     private static ExecutorService clientExecutor;
84
85     private static final int CONCURRENCY = 32;
86     private static final InetSocketAddress NETCONF_ADDRESS = new InetSocketAddress("127.0.0.1", 8303);
87
88     private final int nettyThreads;
89     private final Class<? extends Runnable> clientRunnable;
90     private final Set<String> serverCaps;
91
92     public ConcurrentClientsTest(final int nettyThreads, final Class<? extends Runnable> clientRunnable,
93             final Set<String> serverCaps) {
94         this.nettyThreads = nettyThreads;
95         this.clientRunnable = clientRunnable;
96         this.serverCaps = serverCaps;
97     }
98
99     @Parameterized.Parameters()
100     public static Collection<Object[]> data() {
101         return List.of(new Object[][]{
102             { 4, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES},
103             { 1, TestingNetconfClientRunnable.class, NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES},
104             // empty set of capabilities = only base 1.0 netconf capability
105             { 4, TestingNetconfClientRunnable.class, Set.of()},
106             { 4, TestingNetconfClientRunnable.class, getOnlyExiServerCaps()},
107             { 4, TestingNetconfClientRunnable.class, getOnlyChunkServerCaps()},
108             { 4, BlockingClientRunnable.class, getOnlyExiServerCaps()},
109             { 1, BlockingClientRunnable.class, getOnlyExiServerCaps()},
110         });
111     }
112
113     private EventLoopGroup nettyGroup;
114     private NetconfClientDispatcher netconfClientDispatcher;
115
116     HashedWheelTimer hashedWheelTimer;
117     private TestingNetconfOperation testingNetconfOperation;
118
119     public static NetconfMonitoringService createMockedMonitoringService() {
120         NetconfMonitoringService monitoring = mock(NetconfMonitoringService.class);
121         final SessionListener sessionListener = mock(SessionListener.class);
122         doNothing().when(sessionListener).onSessionUp(any(NetconfServerSession.class));
123         doNothing().when(sessionListener).onSessionDown(any(NetconfServerSession.class));
124         doNothing().when(sessionListener).onSessionEvent(any(SessionEvent.class));
125         doReturn((Registration) () -> { }).when(monitoring)
126             .registerCapabilitiesListener(any(NetconfMonitoringService.CapabilitiesListener.class));
127         doReturn(sessionListener).when(monitoring).getSessionListener();
128         doReturn(new CapabilitiesBuilder().setCapability(Set.of()).build()).when(monitoring).getCapabilities();
129         return monitoring;
130     }
131
132     @BeforeClass
133     public static void setUpClientExecutor() {
134         clientExecutor = Executors.newFixedThreadPool(CONCURRENCY, new ThreadFactory() {
135             int index = 1;
136
137             @Override
138             public Thread newThread(final Runnable runnable) {
139                 Thread thread = new Thread(runnable);
140                 thread.setName("client-" + index++);
141                 thread.setDaemon(true);
142                 return thread;
143             }
144         });
145     }
146
147     @Before
148     public void setUp() throws Exception {
149         hashedWheelTimer = new HashedWheelTimer();
150         nettyGroup = new NioEventLoopGroup(nettyThreads);
151         netconfClientDispatcher = new NetconfClientDispatcherImpl(nettyGroup, nettyGroup, hashedWheelTimer);
152
153         AggregatedNetconfOperationServiceFactory factoriesListener = new AggregatedNetconfOperationServiceFactory();
154
155         testingNetconfOperation = new TestingNetconfOperation();
156         factoriesListener.onAddNetconfOperationServiceFactory(
157                 new TestingOperationServiceFactory(testingNetconfOperation));
158
159         SessionIdProvider idProvider = new DefaultSessionIdProvider();
160
161         NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new
162                 NetconfServerSessionNegotiatorFactoryBuilder()
163                 .setTimer(hashedWheelTimer)
164                 .setAggregatedOpService(factoriesListener)
165                 .setIdProvider(idProvider)
166                 .setConnectionTimeoutMillis(5000)
167                 .setMonitoringService(createMockedMonitoringService())
168                 .setBaseCapabilities(serverCaps)
169                 .build();
170
171         ServerChannelInitializer serverChannelInitializer =
172                 new ServerChannelInitializer(serverNegotiatorFactory);
173         final NetconfServerDispatcherImpl dispatch =
174                 new NetconfServerDispatcherImpl(serverChannelInitializer, nettyGroup, nettyGroup);
175
176         ChannelFuture server = dispatch.createServer(NETCONF_ADDRESS);
177         server.await();
178     }
179
180     @After
181     public void tearDown() {
182         hashedWheelTimer.stop();
183         try {
184             nettyGroup.shutdownGracefully().get();
185         } catch (InterruptedException | ExecutionException e) {
186             LOG.warn("Ignoring exception while cleaning up after test", e);
187         }
188     }
189
190     @AfterClass
191     public static void tearDownClientExecutor() {
192         clientExecutor.shutdownNow();
193     }
194
195     @Test(timeout = CONCURRENCY * 1000)
196     public void testConcurrentClients() throws Exception {
197
198         List<Future<?>> futures = new ArrayList<>(CONCURRENCY);
199
200         for (int i = 0; i < CONCURRENCY; i++) {
201             futures.add(clientExecutor.submit(getInstanceOfClientRunnable()));
202         }
203
204         for (Future<?> future : futures) {
205             try {
206                 future.get();
207             } catch (InterruptedException e) {
208                 throw new IllegalStateException(e);
209             } catch (ExecutionException e) {
210                 LOG.error("Thread for testing client failed", e);
211                 throw e;
212             }
213         }
214
215         assertEquals(CONCURRENCY, testingNetconfOperation.getMessageCount());
216     }
217
218     public static Set<String> getOnlyExiServerCaps() {
219         return Set.of(CapabilityURN.BASE, CapabilityURN.EXI);
220     }
221
222     public static Set<String> getOnlyChunkServerCaps() {
223         return Set.of(CapabilityURN.BASE, CapabilityURN.BASE_1_1);
224     }
225
226     public Runnable getInstanceOfClientRunnable() throws Exception {
227         return clientRunnable.getConstructor(ConcurrentClientsTest.class).newInstance(this);
228     }
229
230     /**
231      * Responds to all operations except start-exi and counts all requests.
232      */
233     private static class TestingNetconfOperation implements NetconfOperation {
234
235         private final AtomicLong counter = new AtomicLong();
236
237         @Override
238         public HandlingPriority canHandle(final Document message) {
239             return XmlUtil.toString(message).contains(NetconfStartExiMessage.START_EXI)
240                     ? HandlingPriority.CANNOT_HANDLE :
241                     HandlingPriority.HANDLE_WITH_MAX_PRIORITY;
242         }
243
244         @SuppressWarnings("checkstyle:IllegalCatch")
245         @Override
246         public Document handle(final Document requestMessage,
247                 final NetconfOperationChainedExecution subsequentOperation) throws DocumentedException {
248             try {
249                 LOG.info("Handling netconf message from test {}", XmlUtil.toString(requestMessage));
250                 counter.getAndIncrement();
251                 return XmlUtil.readXmlToDocument("<test/>");
252             } catch (Exception e) {
253                 throw new RuntimeException(e);
254             }
255         }
256
257         public long getMessageCount() {
258             return counter.get();
259         }
260     }
261
262     /**
263      * Hardcoded operation service factory.
264      */
265     private static class TestingOperationServiceFactory implements NetconfOperationServiceFactory {
266         private final NetconfOperation[] operations;
267
268         TestingOperationServiceFactory(final NetconfOperation... operations) {
269             this.operations = operations;
270         }
271
272         @Override
273         public Set<Capability> getCapabilities() {
274             return Set.of();
275         }
276
277         @Override
278         public Registration registerCapabilityListener(final CapabilityListener listener) {
279             // No-op
280             return () -> { };
281         }
282
283         @Override
284         public NetconfOperationService createService(final SessionIdType sessionId) {
285             return new NetconfOperationService() {
286
287                 @Override
288                 public Set<NetconfOperation> getNetconfOperations() {
289                     return Set.of(operations);
290                 }
291
292                 @Override
293                 public void close() {
294                 }
295             };
296         }
297     }
298
299     /**
300      * Pure socket based blocking client.
301      */
302     public final class BlockingClientRunnable implements Runnable {
303
304         @Override
305         @SuppressWarnings("checkstyle:IllegalCatch")
306         public void run() {
307             try {
308                 run2();
309             } catch (Exception e) {
310                 throw new IllegalStateException(Thread.currentThread().getName(), e);
311             }
312         }
313
314         private void run2() throws Exception {
315             InputStream clientHello = requireNonNull(XmlFileLoader.getResourceAsStream(
316                 "netconfMessages/client_hello.xml"));
317             final InputStream getConfig = requireNonNull(XmlFileLoader.getResourceAsStream(
318                 "netconfMessages/getConfig.xml"));
319
320             Socket clientSocket = new Socket(NETCONF_ADDRESS.getHostString(), NETCONF_ADDRESS.getPort());
321             DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
322             InputStreamReader inFromServer = new InputStreamReader(clientSocket.getInputStream());
323
324             StringBuilder sb = new StringBuilder();
325             while (!sb.toString().endsWith("]]>]]>")) {
326                 sb.append((char) inFromServer.read());
327             }
328             LOG.info(sb.toString());
329
330             outToServer.write(ByteStreams.toByteArray(clientHello));
331             outToServer.write("]]>]]>".getBytes());
332             outToServer.flush();
333             // Thread.sleep(100);
334             outToServer.write(ByteStreams.toByteArray(getConfig));
335             outToServer.write("]]>]]>".getBytes());
336             outToServer.flush();
337             Thread.sleep(100);
338             sb = new StringBuilder();
339             while (!sb.toString().endsWith("]]>]]>")) {
340                 sb.append((char) inFromServer.read());
341             }
342             LOG.info(sb.toString());
343             clientSocket.close();
344         }
345     }
346
347     /**
348      * TestingNetconfClient based runnable.
349      */
350     public final class TestingNetconfClientRunnable implements Runnable {
351
352         @SuppressWarnings("checkstyle:IllegalCatch")
353         @Override
354         public void run() {
355             try {
356                 final TestingNetconfClient netconfClient =
357                         new TestingNetconfClient(Thread.currentThread().getName(), netconfClientDispatcher,
358                                 getClientConfig());
359                 final var sessionId = netconfClient.sessionId();
360                 LOG.info("Client with session id {}: hello exchanged", sessionId);
361
362                 final NetconfMessage getMessage = XmlFileLoader
363                         .xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
364                 NetconfMessage result = netconfClient.sendRequest(getMessage).get();
365                 LOG.info("Client with session id {}: got result {}", sessionId.getValue(), result);
366
367                 checkState(NetconfMessageUtil.isErrorMessage(result) == false,
368                         "Received error response: " + XmlUtil.toString(result.getDocument()) + " to request: "
369                                 + XmlUtil.toString(getMessage.getDocument()));
370
371                 netconfClient.close();
372                 LOG.info("Client with session id {}: ended", sessionId.getValue());
373             } catch (final Exception e) {
374                 throw new IllegalStateException(Thread.currentThread().getName(), e);
375             }
376         }
377
378         private NetconfClientConfiguration getClientConfig() {
379             return NetconfClientConfigurationBuilder.create()
380                 .withAddress(NETCONF_ADDRESS)
381                 .withAdditionalHeader(
382                     new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp", "client"))
383                 .withSessionListener(new SimpleNetconfClientSessionListener())
384                 .build();
385         }
386     }
387 }