Get rid of netconf operation filters
[controller.git] / opendaylight / netconf / netconf-impl / src / test / java / org / opendaylight / controller / netconf / impl / ConcurrentClientsTest.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.impl;
10
11 import static com.google.common.base.Preconditions.checkNotNull;
12 import static org.junit.Assert.fail;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Mockito.doNothing;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.mock;
17
18 import java.io.DataOutputStream;
19 import java.io.InputStream;
20 import java.io.InputStreamReader;
21 import java.lang.management.ManagementFactory;
22 import java.net.InetSocketAddress;
23 import java.net.Socket;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.List;
27 import java.util.Set;
28
29 import javax.management.ObjectName;
30
31 import org.apache.commons.io.IOUtils;
32 import org.junit.After;
33 import org.junit.Before;
34 import org.junit.Test;
35 import org.mockito.Mock;
36 import org.mockito.MockitoAnnotations;
37 import org.opendaylight.controller.config.util.ConfigRegistryJMXClient;
38 import org.opendaylight.controller.config.util.ConfigTransactionJMXClient;
39 import org.opendaylight.controller.config.yang.store.api.YangStoreService;
40 import org.opendaylight.controller.config.yang.store.api.YangStoreSnapshot;
41 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
42 import org.opendaylight.controller.netconf.api.NetconfMessage;
43 import org.opendaylight.controller.netconf.client.NetconfClient;
44 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
45 import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
46 import org.opendaylight.controller.netconf.impl.osgi.SessionMonitoringService;
47 import org.opendaylight.controller.netconf.mapping.api.Capability;
48 import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
49 import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
50 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
51 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
52 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
53 import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
54 import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
55 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58 import org.w3c.dom.Document;
59
60 import com.google.common.base.Optional;
61 import com.google.common.collect.Sets;
62
63 import io.netty.channel.ChannelFuture;
64 import io.netty.channel.EventLoopGroup;
65 import io.netty.channel.nio.NioEventLoopGroup;
66 import io.netty.util.HashedWheelTimer;
67
68 public class ConcurrentClientsTest {
69
70     private static final int CONCURRENCY = 16;
71     private EventLoopGroup nettyGroup;
72     private NetconfClientDispatcher netconfClientDispatcher;
73
74     @Mock
75     private YangStoreService yangStoreService;
76     @Mock
77     private ConfigRegistryJMXClient jmxClient;
78
79     private final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303);
80
81     static final Logger logger = LoggerFactory.getLogger(ConcurrentClientsTest.class);
82
83     private DefaultCommitNotificationProducer commitNot;
84     private NetconfServerDispatcher dispatch;
85
86     @Mock
87     private SessionMonitoringService monitoring;
88
89     HashedWheelTimer hashedWheelTimer;
90
91     @Before
92     public void setUp() throws Exception {
93         { // init mocks
94             MockitoAnnotations.initMocks(this);
95             final YangStoreSnapshot yStore = mock(YangStoreSnapshot.class);
96             doReturn(yStore).when(this.yangStoreService).getYangStoreSnapshot();
97             doReturn(Collections.emptyMap()).when(yStore).getModuleMXBeanEntryMap();
98
99             final ConfigTransactionJMXClient mockedTCl = mock(ConfigTransactionJMXClient.class);
100             doReturn(mockedTCl).when(this.jmxClient).getConfigTransactionClient(any(ObjectName.class));
101
102             doReturn(Collections.emptySet()).when(jmxClient).lookupConfigBeans();
103         }
104
105         nettyGroup = new NioEventLoopGroup();
106         NetconfHelloMessageAdditionalHeader additionalHeader = new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp", "client");
107         netconfClientDispatcher = new NetconfClientDispatcher( nettyGroup, nettyGroup, additionalHeader, 5000);
108
109         NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
110         factoriesListener.onAddNetconfOperationServiceFactory(mockOpF());
111
112         SessionIdProvider idProvider = new SessionIdProvider();
113         hashedWheelTimer = new HashedWheelTimer();
114         NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory(
115                 hashedWheelTimer, factoriesListener, idProvider, 5000);
116
117         commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
118
119         doNothing().when(monitoring).onSessionUp(any(NetconfServerSession.class));
120         doNothing().when(monitoring).onSessionDown(any(NetconfServerSession.class));
121
122         NetconfServerSessionListenerFactory listenerFactory = new NetconfServerSessionListenerFactory(
123                 factoriesListener, commitNot, idProvider, monitoring);
124         NetconfServerDispatcher.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcher.ServerChannelInitializer(serverNegotiatorFactory, listenerFactory);
125         dispatch = new NetconfServerDispatcher(serverChannelInitializer, nettyGroup, nettyGroup);
126
127         ChannelFuture s = dispatch.createServer(netconfAddress);
128         s.await();
129     }
130
131     @After
132     public void tearDown(){
133         hashedWheelTimer.stop();
134         nettyGroup.shutdownGracefully();
135     }
136
137     private NetconfOperationServiceFactory mockOpF() {
138         return new NetconfOperationServiceFactory() {
139             @Override
140             public NetconfOperationService createService(long netconfSessionId, String netconfSessionIdForReporting) {
141                 return new NetconfOperationService() {
142                     @Override
143                     public Set<Capability> getCapabilities() {
144                         return Collections.emptySet();
145                     }
146
147                     @Override
148                     public Set<NetconfOperation> getNetconfOperations() {
149                         return Sets.<NetconfOperation> newHashSet(new NetconfOperation() {
150                             @Override
151                             public HandlingPriority canHandle(Document message) {
152                                 return HandlingPriority.getHandlingPriority(Integer.MAX_VALUE);
153                             }
154
155                             @Override
156                             public Document handle(Document requestMessage, NetconfOperationChainedExecution subsequentOperation) throws NetconfDocumentedException {
157                                 try {
158                                     return XmlUtil.readXmlToDocument("<test/>");
159                                 } catch (Exception e) {
160                                     throw new RuntimeException(e);
161                                 }
162                             }
163                         });
164                     }
165
166                     @Override
167                     public void close() {
168                     }
169                 };
170             }
171         };
172     }
173
174     @After
175     public void cleanUp() throws Exception {
176         commitNot.close();
177     }
178
179     @Test
180     public void multipleClients() throws Exception {
181         List<TestingThread> threads = new ArrayList<>();
182
183         final int attempts = 5;
184         for (int i = 0; i < CONCURRENCY; i++) {
185             TestingThread thread = new TestingThread(String.valueOf(i), attempts);
186             threads.add(thread);
187             thread.start();
188         }
189
190         for (TestingThread thread : threads) {
191             thread.join();
192             if(thread.thrownException.isPresent()) {
193                 Exception exception = thread.thrownException.get();
194                 logger.error("Thread for testing client failed", exception);
195                 fail("Client thread " + thread + " failed: " + exception.getMessage());
196             }
197         }
198     }
199
200     @Test
201     public void synchronizationTest() throws Exception {
202         new BlockingThread("foo").run2();
203     }
204
205     @Test
206     public void multipleBlockingClients() throws Exception {
207         List<BlockingThread> threads = new ArrayList<>();
208         for (int i = 0; i < CONCURRENCY; i++) {
209             BlockingThread thread = new BlockingThread(String.valueOf(i));
210             threads.add(thread);
211             thread.start();
212         }
213
214         for (BlockingThread thread : threads) {
215             thread.join();
216             if(thread.thrownException.isPresent()) {
217                 Exception exception = thread.thrownException.get();
218                 logger.error("Thread for testing client failed", exception);
219                 fail("Client thread " + thread + " failed: " + exception.getMessage());
220             }
221         }
222     }
223
224     class BlockingThread extends Thread {
225         private Optional<Exception> thrownException;
226
227         public BlockingThread(String name) {
228             super("client-" + name);
229         }
230
231         @Override
232         public void run() {
233             try {
234                 run2();
235                 thrownException = Optional.absent();
236             } catch (Exception e) {
237                 thrownException = Optional.of(e);
238             }
239         }
240
241         private void run2() throws Exception {
242             InputStream clientHello = checkNotNull(XmlFileLoader
243                     .getResourceAsStream("netconfMessages/client_hello.xml"));
244             InputStream getConfig = checkNotNull(XmlFileLoader.getResourceAsStream("netconfMessages/getConfig.xml"));
245
246             Socket clientSocket = new Socket(netconfAddress.getHostString(), netconfAddress.getPort());
247             DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
248             InputStreamReader inFromServer = new InputStreamReader(clientSocket.getInputStream());
249
250             StringBuffer sb = new StringBuffer();
251             while (sb.toString().endsWith("]]>]]>") == false) {
252                 sb.append((char) inFromServer.read());
253             }
254             logger.info(sb.toString());
255
256             outToServer.write(IOUtils.toByteArray(clientHello));
257             outToServer.write("]]>]]>".getBytes());
258             outToServer.flush();
259             // Thread.sleep(100);
260             outToServer.write(IOUtils.toByteArray(getConfig));
261             outToServer.write("]]>]]>".getBytes());
262             outToServer.flush();
263             Thread.sleep(100);
264             sb = new StringBuffer();
265             while (sb.toString().endsWith("]]>]]>") == false) {
266                 sb.append((char) inFromServer.read());
267             }
268             logger.info(sb.toString());
269             clientSocket.close();
270         }
271     }
272
273     class TestingThread extends Thread {
274
275         private final String clientId;
276         private final int attempts;
277         private Optional<Exception> thrownException;
278
279         TestingThread(String clientId, int attempts) {
280             this.clientId = clientId;
281             this.attempts = attempts;
282             setName("client-" + clientId);
283         }
284
285         @Override
286         public void run() {
287             try {
288                 final NetconfClient netconfClient = new NetconfClient(clientId, netconfAddress, netconfClientDispatcher);
289                 long sessionId = netconfClient.getSessionId();
290                 logger.info("Client with sessionid {} hello exchanged", sessionId);
291
292                 final NetconfMessage getMessage = XmlFileLoader
293                         .xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
294                 NetconfMessage result = netconfClient.sendMessage(getMessage);
295                 logger.info("Client with sessionid {} got result {}", sessionId, result);
296                 netconfClient.close();
297                 logger.info("Client with session id {} ended", sessionId);
298                 thrownException = Optional.absent();
299             } catch (final Exception e) {
300                 thrownException = Optional.of(e);
301             }
302         }
303     }
304 }