BUG-848 Fix netconf communication while using CHUNK encoding
[controller.git] / opendaylight / netconf / netconf-impl / src / test / java / org / opendaylight / controller / netconf / impl / ConcurrentClientsTest.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.impl;
10
11 import static com.google.common.base.Preconditions.checkNotNull;
12 import static org.junit.Assert.assertEquals;
13 import static org.junit.Assert.fail;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Mockito.doNothing;
16 import static org.mockito.Mockito.mock;
17
18 import com.google.common.base.Preconditions;
19 import java.io.DataOutputStream;
20 import java.io.InputStream;
21 import java.io.InputStreamReader;
22 import java.lang.management.ManagementFactory;
23 import java.net.InetSocketAddress;
24 import java.net.Socket;
25 import java.util.ArrayList;
26 import java.util.Collections;
27 import java.util.List;
28 import java.util.Set;
29
30 import java.util.concurrent.atomic.AtomicLong;
31 import org.apache.commons.io.IOUtils;
32 import org.junit.After;
33 import org.junit.Before;
34 import org.junit.Ignore;
35 import org.junit.Test;
36 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
37 import org.opendaylight.controller.netconf.api.NetconfMessage;
38 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
39 import org.opendaylight.controller.netconf.client.test.TestingNetconfClient;
40 import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
41 import org.opendaylight.controller.netconf.impl.osgi.SessionMonitoringService;
42 import org.opendaylight.controller.netconf.mapping.api.Capability;
43 import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
44 import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
45 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
46 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
47 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
48 import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
49 import org.opendaylight.controller.netconf.util.messages.NetconfMessageUtil;
50 import org.opendaylight.controller.netconf.util.messages.NetconfStartExiMessage;
51 import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
52 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55 import org.w3c.dom.Document;
56
57 import com.google.common.base.Optional;
58 import com.google.common.collect.Sets;
59
60 import io.netty.channel.ChannelFuture;
61 import io.netty.channel.EventLoopGroup;
62 import io.netty.channel.nio.NioEventLoopGroup;
63 import io.netty.util.HashedWheelTimer;
64
65 public class ConcurrentClientsTest {
66
67     private static final int CONCURRENCY = 64;
68     public static final int NETTY_THREADS = 4;
69
70     private EventLoopGroup nettyGroup;
71     private NetconfClientDispatcher netconfClientDispatcher;
72
73     private final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303);
74
75     static final Logger logger = LoggerFactory.getLogger(ConcurrentClientsTest.class);
76
77     private DefaultCommitNotificationProducer commitNot;
78
79     HashedWheelTimer hashedWheelTimer;
80     private TestingNetconfOperation testingNetconfOperation;
81
82     public static SessionMonitoringService createMockedMonitoringService() {
83         SessionMonitoringService monitoring = mock(SessionMonitoringService.class);
84         doNothing().when(monitoring).onSessionUp(any(NetconfServerSession.class));
85         doNothing().when(monitoring).onSessionDown(any(NetconfServerSession.class));
86         return monitoring;
87     }
88
89     // TODO refactor and test with different configurations
90
91     @Before
92     public void setUp() throws Exception {
93
94         nettyGroup = new NioEventLoopGroup(NETTY_THREADS);
95         NetconfHelloMessageAdditionalHeader additionalHeader = new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp", "client");
96         netconfClientDispatcher = new NetconfClientDispatcher( nettyGroup, nettyGroup, additionalHeader, 5000);
97
98         NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
99         testingNetconfOperation = new TestingNetconfOperation();
100         factoriesListener.onAddNetconfOperationServiceFactory(mockOpF(testingNetconfOperation));
101
102         SessionIdProvider idProvider = new SessionIdProvider();
103         hashedWheelTimer = new HashedWheelTimer();
104
105         NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory(
106                 hashedWheelTimer, factoriesListener, idProvider, 5000, commitNot, createMockedMonitoringService());
107
108         commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
109
110         NetconfServerDispatcher.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcher.ServerChannelInitializer(serverNegotiatorFactory);
111         final NetconfServerDispatcher dispatch = new NetconfServerDispatcher(serverChannelInitializer, nettyGroup, nettyGroup);
112
113         ChannelFuture s = dispatch.createServer(netconfAddress);
114         s.await();
115     }
116
117     @After
118     public void tearDown(){
119         hashedWheelTimer.stop();
120         nettyGroup.shutdownGracefully();
121     }
122
123     private NetconfOperationServiceFactory mockOpF(final NetconfOperation... operations) {
124         return new TestingOperationServiceFactory(operations);
125     }
126
127     @After
128     public void cleanUp() throws Exception {
129         commitNot.close();
130     }
131
132     @Test(timeout = 30 * 1000)
133     public void multipleClients() throws Exception {
134         List<TestingThread> threads = new ArrayList<>();
135
136         final int attempts = 5;
137         for (int i = 0; i < CONCURRENCY; i++) {
138             TestingThread thread = new TestingThread(String.valueOf(i), attempts);
139             threads.add(thread);
140             thread.start();
141         }
142
143         for (TestingThread thread : threads) {
144             thread.join();
145             if(thread.thrownException.isPresent()) {
146                 Exception exception = thread.thrownException.get();
147                 logger.error("Thread for testing client failed", exception);
148                 fail("Client thread " + thread + " failed: " + exception.getMessage());
149             }
150         }
151
152         assertEquals(CONCURRENCY, testingNetconfOperation.getMessageCount());
153     }
154
155     /**
156      * Cannot handle CHUNK, make server configurable
157      */
158     @Ignore
159     @Test(timeout = 30 * 1000)
160     public void synchronizationTest() throws Exception {
161         new BlockingThread("foo").run2();
162     }
163
164     /**
165      * Cannot handle CHUNK, make server configurable
166      */
167     @Ignore
168     @Test(timeout = 30 * 1000)
169     public void multipleBlockingClients() throws Exception {
170         List<BlockingThread> threads = new ArrayList<>();
171         for (int i = 0; i < CONCURRENCY; i++) {
172             BlockingThread thread = new BlockingThread(String.valueOf(i));
173             threads.add(thread);
174             thread.start();
175         }
176
177         for (BlockingThread thread : threads) {
178             thread.join();
179             if(thread.thrownException.isPresent()) {
180                 Exception exception = thread.thrownException.get();
181                 logger.error("Thread for testing client failed", exception);
182                 fail("Client thread " + thread + " failed: " + exception.getMessage());
183             }
184         }
185     }
186
187     private static class TestingNetconfOperation implements NetconfOperation {
188
189         private final AtomicLong counter = new AtomicLong();
190
191         @Override
192         public HandlingPriority canHandle(Document message) {
193             return XmlUtil.toString(message).contains(NetconfStartExiMessage.START_EXI) ?
194                     HandlingPriority.CANNOT_HANDLE :
195                     HandlingPriority.HANDLE_WITH_MAX_PRIORITY;
196         }
197
198         @Override
199         public Document handle(Document requestMessage, NetconfOperationChainedExecution subsequentOperation) throws NetconfDocumentedException {
200             try {
201                 logger.info("Handling netconf message from test {}", XmlUtil.toString(requestMessage));
202                 counter.getAndIncrement();
203                 return XmlUtil.readXmlToDocument("<test/>");
204             } catch (Exception e) {
205                 throw new RuntimeException(e);
206             }
207         }
208
209         public long getMessageCount() {
210             return counter.get();
211         }
212     }
213
214     private static class TestingOperationServiceFactory implements NetconfOperationServiceFactory {
215         private final NetconfOperation[] operations;
216
217         public TestingOperationServiceFactory(final NetconfOperation... operations) {
218             this.operations = operations;
219         }
220
221         @Override
222         public NetconfOperationService createService(String netconfSessionIdForReporting) {
223             return new NetconfOperationService() {
224                 @Override
225                 public Set<Capability> getCapabilities() {
226                     return Collections.emptySet();
227                 }
228
229                 @Override
230                 public Set<NetconfOperation> getNetconfOperations() {
231                     return Sets.<NetconfOperation> newHashSet(operations);
232                 }
233
234                 @Override
235                 public void close() {
236                 }
237             };
238         }
239     }
240
241     class BlockingThread extends Thread {
242         private Optional<Exception> thrownException;
243
244         public BlockingThread(String name) {
245             super("client-" + name);
246         }
247
248         @Override
249         public void run() {
250             try {
251                 run2();
252                 thrownException = Optional.absent();
253             } catch (Exception e) {
254                 thrownException = Optional.of(e);
255             }
256         }
257
258         private void run2() throws Exception {
259             InputStream clientHello = checkNotNull(XmlFileLoader
260                     .getResourceAsStream("netconfMessages/client_hello.xml"));
261             InputStream getConfig = checkNotNull(XmlFileLoader.getResourceAsStream("netconfMessages/getConfig.xml"));
262
263             Socket clientSocket = new Socket(netconfAddress.getHostString(), netconfAddress.getPort());
264             DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
265             InputStreamReader inFromServer = new InputStreamReader(clientSocket.getInputStream());
266
267             StringBuffer sb = new StringBuffer();
268             while (sb.toString().endsWith("]]>]]>") == false) {
269                 sb.append((char) inFromServer.read());
270             }
271             logger.info(sb.toString());
272
273             outToServer.write(IOUtils.toByteArray(clientHello));
274             outToServer.write("]]>]]>".getBytes());
275             outToServer.flush();
276             // Thread.sleep(100);
277             outToServer.write(IOUtils.toByteArray(getConfig));
278             outToServer.write("]]>]]>".getBytes());
279             outToServer.flush();
280             Thread.sleep(100);
281             sb = new StringBuffer();
282             while (sb.toString().endsWith("]]>]]>") == false) {
283                 sb.append((char) inFromServer.read());
284             }
285             logger.info(sb.toString());
286             clientSocket.close();
287         }
288     }
289
290     class TestingThread extends Thread {
291
292         private final String clientId;
293         private final int attempts;
294         private Optional<Exception> thrownException;
295
296         TestingThread(String clientId, int attempts) {
297             this.clientId = clientId;
298             this.attempts = attempts;
299             setName("client-" + clientId);
300         }
301
302         @Override
303         public void run() {
304             try {
305                 final TestingNetconfClient netconfClient = new TestingNetconfClient(clientId, netconfAddress, netconfClientDispatcher);
306                 long sessionId = netconfClient.getSessionId();
307                 logger.info("Client with sessionid {} hello exchanged", sessionId);
308
309                 final NetconfMessage getMessage = XmlFileLoader
310                         .xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
311                 NetconfMessage result = netconfClient.sendRequest(getMessage).get();
312                 logger.info("Client with sessionid {} got result {}", sessionId, result);
313
314                 Preconditions.checkState(NetconfMessageUtil.isErrorMessage(result) == false,
315                         "Received error response: " + XmlUtil.toString(result.getDocument()) +
316                                 " to request: " + XmlUtil.toString(getMessage.getDocument()));
317
318                 netconfClient.close();
319                 logger.info("Client with session id {} ended", sessionId);
320                 thrownException = Optional.absent();
321             } catch (final Exception e) {
322                 thrownException = Optional.of(e);
323             }
324         }
325     }
326 }