Merge "Fixed publishDataChangeEvent in 2phase commit"
[controller.git] / opendaylight / netconf / netconf-impl / src / test / java / org / opendaylight / controller / netconf / impl / ConcurrentClientsTest.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.impl;
10
11 import com.google.common.base.Optional;
12 import com.google.common.collect.Sets;
13 import io.netty.channel.ChannelFuture;
14 import io.netty.channel.EventLoopGroup;
15 import io.netty.channel.nio.NioEventLoopGroup;
16 import io.netty.util.HashedWheelTimer;
17 import org.apache.commons.io.IOUtils;
18 import org.junit.After;
19 import org.junit.Before;
20 import org.junit.Test;
21 import org.mockito.Mock;
22 import org.mockito.MockitoAnnotations;
23 import org.opendaylight.controller.config.util.ConfigRegistryJMXClient;
24 import org.opendaylight.controller.config.util.ConfigTransactionJMXClient;
25 import org.opendaylight.controller.config.yang.store.api.YangStoreService;
26 import org.opendaylight.controller.config.yang.store.api.YangStoreSnapshot;
27 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
28 import org.opendaylight.controller.netconf.api.NetconfMessage;
29 import org.opendaylight.controller.netconf.api.NetconfOperationRouter;
30 import org.opendaylight.controller.netconf.client.NetconfClient;
31 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
32 import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
33 import org.opendaylight.controller.netconf.impl.osgi.SessionMonitoringService;
34 import org.opendaylight.controller.netconf.mapping.api.Capability;
35 import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
36 import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
37 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationFilter;
38 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
39 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
40 import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
41 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44 import org.w3c.dom.Document;
45
46 import javax.management.ObjectName;
47 import java.io.DataOutputStream;
48 import java.io.InputStream;
49 import java.io.InputStreamReader;
50 import java.lang.management.ManagementFactory;
51 import java.net.InetSocketAddress;
52 import java.net.Socket;
53 import java.util.ArrayList;
54 import java.util.Collections;
55 import java.util.List;
56 import java.util.Set;
57
58 import static com.google.common.base.Preconditions.checkNotNull;
59 import static org.junit.Assert.fail;
60 import static org.mockito.Matchers.any;
61 import static org.mockito.Mockito.doNothing;
62 import static org.mockito.Mockito.doReturn;
63 import static org.mockito.Mockito.mock;
64
65 public class ConcurrentClientsTest {
66
67     private static final int CONCURRENCY = 16;
68     private EventLoopGroup nettyGroup;
69     private NetconfClientDispatcher netconfClientDispatcher;
70
71     @Mock
72     private YangStoreService yangStoreService;
73     @Mock
74     private ConfigRegistryJMXClient jmxClient;
75
76     private final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303);
77
78     static final Logger logger = LoggerFactory.getLogger(ConcurrentClientsTest.class);
79
80     private DefaultCommitNotificationProducer commitNot;
81     private NetconfServerDispatcher dispatch;
82
83     @Mock
84     private SessionMonitoringService monitoring;
85
86     HashedWheelTimer hashedWheelTimer;
87
88     @Before
89     public void setUp() throws Exception {
90         { // init mocks
91             MockitoAnnotations.initMocks(this);
92             final YangStoreSnapshot yStore = mock(YangStoreSnapshot.class);
93             doReturn(yStore).when(this.yangStoreService).getYangStoreSnapshot();
94             doReturn(Collections.emptyMap()).when(yStore).getModuleMXBeanEntryMap();
95
96             final ConfigTransactionJMXClient mockedTCl = mock(ConfigTransactionJMXClient.class);
97             doReturn(mockedTCl).when(this.jmxClient).getConfigTransactionClient(any(ObjectName.class));
98
99             doReturn(Collections.emptySet()).when(jmxClient).lookupConfigBeans();
100         }
101
102         nettyGroup = new NioEventLoopGroup();
103         netconfClientDispatcher = new NetconfClientDispatcher( nettyGroup, nettyGroup, 5000);
104
105         NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
106         factoriesListener.onAddNetconfOperationServiceFactory(mockOpF());
107
108         SessionIdProvider idProvider = new SessionIdProvider();
109         hashedWheelTimer = new HashedWheelTimer();
110         NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory(
111                 hashedWheelTimer, factoriesListener, idProvider, 5000);
112
113         commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
114
115         doNothing().when(monitoring).onSessionUp(any(NetconfServerSession.class));
116         doNothing().when(monitoring).onSessionDown(any(NetconfServerSession.class));
117
118         NetconfServerSessionListenerFactory listenerFactory = new NetconfServerSessionListenerFactory(
119                 factoriesListener, commitNot, idProvider, monitoring);
120         NetconfServerDispatcher.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcher.ServerChannelInitializer(serverNegotiatorFactory, listenerFactory);
121         dispatch = new NetconfServerDispatcher(serverChannelInitializer, nettyGroup, nettyGroup);
122
123         ChannelFuture s = dispatch.createServer(netconfAddress);
124         s.await();
125     }
126
127     @After
128     public void tearDown(){
129         hashedWheelTimer.stop();
130         nettyGroup.shutdownGracefully();
131     }
132
133     private NetconfOperationServiceFactory mockOpF() {
134         return new NetconfOperationServiceFactory() {
135             @Override
136             public NetconfOperationService createService(long netconfSessionId, String netconfSessionIdForReporting) {
137                 return new NetconfOperationService() {
138                     @Override
139                     public Set<Capability> getCapabilities() {
140                         return Collections.emptySet();
141                     }
142
143                     @Override
144                     public Set<NetconfOperation> getNetconfOperations() {
145                         return Sets.<NetconfOperation> newHashSet(new NetconfOperation() {
146                             @Override
147                             public HandlingPriority canHandle(Document message) {
148                                 return HandlingPriority.getHandlingPriority(Integer.MAX_VALUE);
149                             }
150
151                             @Override
152                             public Document handle(Document message, NetconfOperationRouter operationRouter)
153                                     throws NetconfDocumentedException {
154                                 try {
155                                     return XmlUtil.readXmlToDocument("<test/>");
156                                 } catch (Exception e) {
157                                     throw new RuntimeException(e);
158                                 }
159                             }
160                         });
161                     }
162
163                     @Override
164                     public Set<NetconfOperationFilter> getFilters() {
165                         return Collections.emptySet();
166                     }
167
168                     @Override
169                     public void close() {
170                     }
171                 };
172             }
173         };
174     }
175
176     @After
177     public void cleanUp() throws Exception {
178         commitNot.close();
179     }
180
181     @Test
182     public void multipleClients() throws Exception {
183         List<TestingThread> threads = new ArrayList<>();
184
185         final int attempts = 5;
186         for (int i = 0; i < CONCURRENCY; i++) {
187             TestingThread thread = new TestingThread(String.valueOf(i), attempts);
188             threads.add(thread);
189             thread.start();
190         }
191
192         for (TestingThread thread : threads) {
193             thread.join();
194             if(thread.thrownException.isPresent()) {
195                 Exception exception = thread.thrownException.get();
196                 logger.error("Thread for testing client failed", exception);
197                 fail("Client thread " + thread + " failed: " + exception.getMessage());
198             }
199         }
200     }
201
202     @Test
203     public void synchronizationTest() throws Exception {
204         new BlockingThread("foo").run2();
205     }
206
207     @Test
208     public void multipleBlockingClients() throws Exception {
209         List<BlockingThread> threads = new ArrayList<>();
210         for (int i = 0; i < CONCURRENCY; i++) {
211             BlockingThread thread = new BlockingThread(String.valueOf(i));
212             threads.add(thread);
213             thread.start();
214         }
215
216         for (BlockingThread thread : threads) {
217             thread.join();
218             if(thread.thrownException.isPresent()) {
219                 Exception exception = thread.thrownException.get();
220                 logger.error("Thread for testing client failed", exception);
221                 fail("Client thread " + thread + " failed: " + exception.getMessage());
222             }
223         }
224     }
225
226     class BlockingThread extends Thread {
227         private Optional<Exception> thrownException;
228
229         public BlockingThread(String name) {
230             super("client-" + name);
231         }
232
233         @Override
234         public void run() {
235             try {
236                 run2();
237                 thrownException = Optional.absent();
238             } catch (Exception e) {
239                 thrownException = Optional.of(e);
240             }
241         }
242
243         private void run2() throws Exception {
244             InputStream clientHello = checkNotNull(XmlFileLoader
245                     .getResourceAsStream("netconfMessages/client_hello.xml"));
246             InputStream getConfig = checkNotNull(XmlFileLoader.getResourceAsStream("netconfMessages/getConfig.xml"));
247
248             Socket clientSocket = new Socket(netconfAddress.getHostString(), netconfAddress.getPort());
249             DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
250             InputStreamReader inFromServer = new InputStreamReader(clientSocket.getInputStream());
251
252             StringBuffer sb = new StringBuffer();
253             while (sb.toString().endsWith("]]>]]>") == false) {
254                 sb.append((char) inFromServer.read());
255             }
256             logger.info(sb.toString());
257
258             outToServer.write(IOUtils.toByteArray(clientHello));
259             outToServer.write("]]>]]>".getBytes());
260             outToServer.flush();
261             // Thread.sleep(100);
262             outToServer.write(IOUtils.toByteArray(getConfig));
263             outToServer.write("]]>]]>".getBytes());
264             outToServer.flush();
265             Thread.sleep(100);
266             sb = new StringBuffer();
267             while (sb.toString().endsWith("]]>]]>") == false) {
268                 sb.append((char) inFromServer.read());
269             }
270             logger.info(sb.toString());
271             clientSocket.close();
272         }
273     }
274
275     class TestingThread extends Thread {
276
277         private final String clientId;
278         private final int attempts;
279         private Optional<Exception> thrownException;
280
281         TestingThread(String clientId, int attempts) {
282             this.clientId = clientId;
283             this.attempts = attempts;
284             setName("client-" + clientId);
285         }
286
287         @Override
288         public void run() {
289             try {
290                 final NetconfClient netconfClient = new NetconfClient(clientId, netconfAddress, netconfClientDispatcher);
291                 long sessionId = netconfClient.getSessionId();
292                 logger.info("Client with sessionid {} hello exchanged", sessionId);
293
294                 final NetconfMessage getMessage = XmlFileLoader
295                         .xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
296                 NetconfMessage result = netconfClient.sendMessage(getMessage);
297                 logger.info("Client with sessionid {} got result {}", sessionId, result);
298                 netconfClient.close();
299                 logger.info("Client with session id {} ended", sessionId);
300                 thrownException = Optional.absent();
301             } catch (final Exception e) {
302                 thrownException = Optional.of(e);
303             }
304         }
305     }
306 }