Merge "Bug 509: Fixed incorrect merging of Data Store Writes / Events"
[controller.git] / opendaylight / netconf / netconf-impl / src / test / java / org / opendaylight / controller / netconf / impl / ConcurrentClientsTest.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.impl;
10
11 import com.google.common.base.Optional;
12 import com.google.common.collect.Sets;
13 import io.netty.channel.ChannelFuture;
14 import io.netty.channel.EventLoopGroup;
15 import io.netty.channel.nio.NioEventLoopGroup;
16 import io.netty.util.HashedWheelTimer;
17 import org.apache.commons.io.IOUtils;
18 import org.junit.After;
19 import org.junit.Before;
20 import org.junit.Test;
21 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
22 import org.opendaylight.controller.netconf.api.NetconfMessage;
23 import org.opendaylight.controller.netconf.client.test.TestingNetconfClient;
24 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
25 import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
26 import org.opendaylight.controller.netconf.impl.osgi.SessionMonitoringService;
27 import org.opendaylight.controller.netconf.mapping.api.Capability;
28 import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
29 import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
30 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
31 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
32 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
33 import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
34 import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
35 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38 import org.w3c.dom.Document;
39
40 import java.io.DataOutputStream;
41 import java.io.InputStream;
42 import java.io.InputStreamReader;
43 import java.lang.management.ManagementFactory;
44 import java.net.InetSocketAddress;
45 import java.net.Socket;
46 import java.util.ArrayList;
47 import java.util.Collections;
48 import java.util.List;
49 import java.util.Set;
50
51 import static com.google.common.base.Preconditions.checkNotNull;
52 import static org.junit.Assert.fail;
53 import static org.mockito.Matchers.any;
54 import static org.mockito.Mockito.doNothing;
55 import static org.mockito.Mockito.mock;
56
57 public class ConcurrentClientsTest {
58
59     private static final int CONCURRENCY = 16;
60     private EventLoopGroup nettyGroup;
61     private NetconfClientDispatcher netconfClientDispatcher;
62
63     private final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303);
64
65     static final Logger logger = LoggerFactory.getLogger(ConcurrentClientsTest.class);
66
67     private DefaultCommitNotificationProducer commitNot;
68     private NetconfServerDispatcher dispatch;
69
70
71
72     HashedWheelTimer hashedWheelTimer;
73
74     public static SessionMonitoringService createMockedMonitoringService() {
75         SessionMonitoringService monitoring = mock(SessionMonitoringService.class);
76         doNothing().when(monitoring).onSessionUp(any(NetconfServerSession.class));
77         doNothing().when(monitoring).onSessionDown(any(NetconfServerSession.class));
78         return monitoring;
79     }
80
81     @Before
82     public void setUp() throws Exception {
83
84         nettyGroup = new NioEventLoopGroup();
85         NetconfHelloMessageAdditionalHeader additionalHeader = new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp", "client");
86         netconfClientDispatcher = new NetconfClientDispatcher( nettyGroup, nettyGroup, additionalHeader, 5000);
87
88         NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
89         factoriesListener.onAddNetconfOperationServiceFactory(mockOpF());
90
91         SessionIdProvider idProvider = new SessionIdProvider();
92         hashedWheelTimer = new HashedWheelTimer();
93         NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory(
94                 hashedWheelTimer, factoriesListener, idProvider, 5000, commitNot, createMockedMonitoringService());
95
96         commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
97
98
99
100         NetconfServerDispatcher.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcher.ServerChannelInitializer(serverNegotiatorFactory);
101         dispatch = new NetconfServerDispatcher(serverChannelInitializer, nettyGroup, nettyGroup);
102
103         ChannelFuture s = dispatch.createServer(netconfAddress);
104         s.await();
105     }
106
107     @After
108     public void tearDown(){
109         hashedWheelTimer.stop();
110         nettyGroup.shutdownGracefully();
111     }
112
113     private NetconfOperationServiceFactory mockOpF() {
114         return new NetconfOperationServiceFactory() {
115             @Override
116             public NetconfOperationService createService(String netconfSessionIdForReporting) {
117                 return new NetconfOperationService() {
118                     @Override
119                     public Set<Capability> getCapabilities() {
120                         return Collections.emptySet();
121                     }
122
123                     @Override
124                     public Set<NetconfOperation> getNetconfOperations() {
125                         return Sets.<NetconfOperation> newHashSet(new NetconfOperation() {
126                             @Override
127                             public HandlingPriority canHandle(Document message) {
128                                 return HandlingPriority.getHandlingPriority(Integer.MAX_VALUE);
129                             }
130
131                             @Override
132                             public Document handle(Document requestMessage, NetconfOperationChainedExecution subsequentOperation) throws NetconfDocumentedException {
133                                 try {
134                                     return XmlUtil.readXmlToDocument("<test/>");
135                                 } catch (Exception e) {
136                                     throw new RuntimeException(e);
137                                 }
138                             }
139                         });
140                     }
141
142                     @Override
143                     public void close() {
144                     }
145                 };
146             }
147         };
148     }
149
150     @After
151     public void cleanUp() throws Exception {
152         commitNot.close();
153     }
154
155     @Test
156     public void multipleClients() throws Exception {
157         List<TestingThread> threads = new ArrayList<>();
158
159         final int attempts = 5;
160         for (int i = 0; i < CONCURRENCY; i++) {
161             TestingThread thread = new TestingThread(String.valueOf(i), attempts);
162             threads.add(thread);
163             thread.start();
164         }
165
166         for (TestingThread thread : threads) {
167             thread.join();
168             if(thread.thrownException.isPresent()) {
169                 Exception exception = thread.thrownException.get();
170                 logger.error("Thread for testing client failed", exception);
171                 fail("Client thread " + thread + " failed: " + exception.getMessage());
172             }
173         }
174     }
175
176     @Test
177     public void synchronizationTest() throws Exception {
178         new BlockingThread("foo").run2();
179     }
180
181     @Test
182     public void multipleBlockingClients() throws Exception {
183         List<BlockingThread> threads = new ArrayList<>();
184         for (int i = 0; i < CONCURRENCY; i++) {
185             BlockingThread thread = new BlockingThread(String.valueOf(i));
186             threads.add(thread);
187             thread.start();
188         }
189
190         for (BlockingThread thread : threads) {
191             thread.join();
192             if(thread.thrownException.isPresent()) {
193                 Exception exception = thread.thrownException.get();
194                 logger.error("Thread for testing client failed", exception);
195                 fail("Client thread " + thread + " failed: " + exception.getMessage());
196             }
197         }
198     }
199
200     class BlockingThread extends Thread {
201         private Optional<Exception> thrownException;
202
203         public BlockingThread(String name) {
204             super("client-" + name);
205         }
206
207         @Override
208         public void run() {
209             try {
210                 run2();
211                 thrownException = Optional.absent();
212             } catch (Exception e) {
213                 thrownException = Optional.of(e);
214             }
215         }
216
217         private void run2() throws Exception {
218             InputStream clientHello = checkNotNull(XmlFileLoader
219                     .getResourceAsStream("netconfMessages/client_hello.xml"));
220             InputStream getConfig = checkNotNull(XmlFileLoader.getResourceAsStream("netconfMessages/getConfig.xml"));
221
222             Socket clientSocket = new Socket(netconfAddress.getHostString(), netconfAddress.getPort());
223             DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
224             InputStreamReader inFromServer = new InputStreamReader(clientSocket.getInputStream());
225
226             StringBuffer sb = new StringBuffer();
227             while (sb.toString().endsWith("]]>]]>") == false) {
228                 sb.append((char) inFromServer.read());
229             }
230             logger.info(sb.toString());
231
232             outToServer.write(IOUtils.toByteArray(clientHello));
233             outToServer.write("]]>]]>".getBytes());
234             outToServer.flush();
235             // Thread.sleep(100);
236             outToServer.write(IOUtils.toByteArray(getConfig));
237             outToServer.write("]]>]]>".getBytes());
238             outToServer.flush();
239             Thread.sleep(100);
240             sb = new StringBuffer();
241             while (sb.toString().endsWith("]]>]]>") == false) {
242                 sb.append((char) inFromServer.read());
243             }
244             logger.info(sb.toString());
245             clientSocket.close();
246         }
247     }
248
249     class TestingThread extends Thread {
250
251         private final String clientId;
252         private final int attempts;
253         private Optional<Exception> thrownException;
254
255         TestingThread(String clientId, int attempts) {
256             this.clientId = clientId;
257             this.attempts = attempts;
258             setName("client-" + clientId);
259         }
260
261         @Override
262         public void run() {
263             try {
264                 final TestingNetconfClient netconfClient = new TestingNetconfClient(clientId, netconfAddress, netconfClientDispatcher);
265                 long sessionId = netconfClient.getSessionId();
266                 logger.info("Client with sessionid {} hello exchanged", sessionId);
267
268                 final NetconfMessage getMessage = XmlFileLoader
269                         .xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
270                 NetconfMessage result = netconfClient.sendRequest(getMessage).get();
271                 logger.info("Client with sessionid {} got result {}", sessionId, result);
272                 netconfClient.close();
273                 logger.info("Client with session id {} ended", sessionId);
274                 thrownException = Optional.absent();
275             } catch (final Exception e) {
276                 thrownException = Optional.of(e);
277             }
278         }
279     }
280 }