Improved exception handling thrown by client threads in ConcurrentClientsTest
[controller.git] / opendaylight / netconf / netconf-impl / src / test / java / org / opendaylight / controller / netconf / impl / ConcurrentClientsTest.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.impl;
10
11 import com.google.common.base.Optional;
12 import com.google.common.collect.Sets;
13 import io.netty.channel.ChannelFuture;
14 import io.netty.util.HashedWheelTimer;
15 import org.apache.commons.io.IOUtils;
16 import org.junit.After;
17 import org.junit.AfterClass;
18 import org.junit.Before;
19 import org.junit.Test;
20 import org.mockito.Mock;
21 import org.mockito.MockitoAnnotations;
22 import org.opendaylight.controller.config.util.ConfigRegistryJMXClient;
23 import org.opendaylight.controller.config.util.ConfigTransactionJMXClient;
24 import org.opendaylight.controller.config.yang.store.api.YangStoreService;
25 import org.opendaylight.controller.config.yang.store.api.YangStoreSnapshot;
26 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
27 import org.opendaylight.controller.netconf.api.NetconfMessage;
28 import org.opendaylight.controller.netconf.api.NetconfOperationRouter;
29 import org.opendaylight.controller.netconf.client.NetconfClient;
30 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
31 import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
32 import org.opendaylight.controller.netconf.mapping.api.Capability;
33 import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
34 import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
35 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationFilter;
36 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
37 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
38 import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
39 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42 import org.w3c.dom.Document;
43
44 import javax.management.ObjectName;
45 import javax.net.ssl.SSLContext;
46 import java.io.DataOutputStream;
47 import java.io.InputStream;
48 import java.io.InputStreamReader;
49 import java.lang.management.ManagementFactory;
50 import java.net.InetSocketAddress;
51 import java.net.Socket;
52 import java.util.ArrayList;
53 import java.util.Collections;
54 import java.util.List;
55 import java.util.Set;
56 import java.util.concurrent.TimeUnit;
57
58 import static com.google.common.base.Preconditions.checkNotNull;
59 import static org.junit.Assert.fail;
60 import static org.mockito.Matchers.any;
61 import static org.mockito.Mockito.doReturn;
62 import static org.mockito.Mockito.mock;
63
64 public class ConcurrentClientsTest {
65
66     private static final int CONCURRENCY = 16;
67     public static final NetconfClientDispatcher NETCONF_CLIENT_DISPATCHER = new NetconfClientDispatcher(Optional.<SSLContext>absent());
68     @Mock
69     private YangStoreService yangStoreService;
70     @Mock
71     private ConfigRegistryJMXClient jmxClient;
72
73     private final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303);
74
75     static final Logger logger = LoggerFactory.getLogger(ConcurrentClientsTest.class);
76
77     private DefaultCommitNotificationProducer commitNot;
78     private NetconfServerDispatcher dispatch;
79
80     @Before
81     public void setUp() throws Exception {
82         { // init mocks
83             MockitoAnnotations.initMocks(this);
84             final YangStoreSnapshot yStore = mock(YangStoreSnapshot.class);
85             doReturn(yStore).when(this.yangStoreService).getYangStoreSnapshot();
86             doReturn(Collections.emptyMap()).when(yStore).getModuleMXBeanEntryMap();
87             doReturn(Collections.emptyMap()).when(yStore).getModuleMap();
88
89             final ConfigTransactionJMXClient mockedTCl = mock(ConfigTransactionJMXClient.class);
90             doReturn(mockedTCl).when(this.jmxClient).getConfigTransactionClient(any(ObjectName.class));
91
92             doReturn(Collections.emptySet()).when(jmxClient).lookupConfigBeans();
93         }
94
95         NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
96         factoriesListener.onAddNetconfOperationServiceFactory(mockOpF());
97
98         SessionIdProvider idProvider = new SessionIdProvider();
99         NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory(
100                 new HashedWheelTimer(5000, TimeUnit.MILLISECONDS), factoriesListener, idProvider);
101
102         commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
103
104         NetconfServerSessionListenerFactory listenerFactory = new NetconfServerSessionListenerFactory(
105                 factoriesListener, commitNot, idProvider);
106         dispatch = new NetconfServerDispatcher(Optional.<SSLContext> absent(), serverNegotiatorFactory, listenerFactory);
107
108         ChannelFuture s = dispatch.createServer(netconfAddress);
109         s.await();
110     }
111
112     @AfterClass
113     public static void tearDownStatic() {
114         NETCONF_CLIENT_DISPATCHER.close();
115     }
116
117     private NetconfOperationServiceFactory mockOpF() {
118         return new NetconfOperationServiceFactory() {
119             @Override
120             public NetconfOperationService createService(long netconfSessionId, String netconfSessionIdForReporting) {
121                 return new NetconfOperationService() {
122                     @Override
123                     public Set<Capability> getCapabilities() {
124                         return Collections.emptySet();
125                     }
126
127                     @Override
128                     public Set<NetconfOperation> getNetconfOperations() {
129                         return Sets.<NetconfOperation> newHashSet(new NetconfOperation() {
130                             @Override
131                             public HandlingPriority canHandle(Document message) {
132                                 return HandlingPriority.getHandlingPriority(Integer.MAX_VALUE);
133                             }
134
135                             @Override
136                             public Document handle(Document message, NetconfOperationRouter operationRouter)
137                                     throws NetconfDocumentedException {
138                                 try {
139                                     return XmlUtil.readXmlToDocument("<test/>");
140                                 } catch (Exception e) {
141                                     throw new RuntimeException(e);
142                                 }
143                             }
144                         });
145                     }
146
147                     @Override
148                     public Set<NetconfOperationFilter> getFilters() {
149                         return Collections.emptySet();
150                     }
151
152                     @Override
153                     public void close() {
154                     }
155                 };
156             }
157         };
158     }
159
160     @After
161     public void cleanUp() throws Exception {
162         commitNot.close();
163         dispatch.close();
164     }
165
166     @Test
167     public void multipleClients() throws Exception {
168         List<TestingThread> threads = new ArrayList<>();
169
170         final int attempts = 5;
171         for (int i = 0; i < CONCURRENCY; i++) {
172             TestingThread thread = new TestingThread(String.valueOf(i), attempts);
173             threads.add(thread);
174             thread.start();
175         }
176
177         for (TestingThread thread : threads) {
178             thread.join();
179             if(thread.thrownException.isPresent()) {
180                 Exception exception = thread.thrownException.get();
181                 logger.error("Thread for testing client failed", exception);
182                 fail("Client thread " + thread + " failed: " + exception.getMessage());
183             }
184         }
185     }
186
187     @Test
188     public void synchronizationTest() throws Exception {
189         new BlockingThread("foo").run2();
190     }
191
192     @Test
193     public void multipleBlockingClients() throws Exception {
194         List<BlockingThread> threads = new ArrayList<>();
195         for (int i = 0; i < CONCURRENCY; i++) {
196             BlockingThread thread = new BlockingThread(String.valueOf(i));
197             threads.add(thread);
198             thread.start();
199         }
200
201         for (BlockingThread thread : threads) {
202             thread.join();
203             if(thread.thrownException.isPresent()) {
204                 Exception exception = thread.thrownException.get();
205                 logger.error("Thread for testing client failed", exception);
206                 fail("Client thread " + thread + " failed: " + exception.getMessage());
207             }
208         }
209     }
210
211     class BlockingThread extends Thread {
212         private Optional<Exception> thrownException;
213
214         public BlockingThread(String name) {
215             super("client-" + name);
216         }
217
218         @Override
219         public void run() {
220             try {
221                 run2();
222                 thrownException = Optional.absent();
223             } catch (Exception e) {
224                 thrownException = Optional.of(e);
225             }
226         }
227
228         private void run2() throws Exception {
229             InputStream clientHello = checkNotNull(XmlFileLoader
230                     .getResourceAsStream("netconfMessages/client_hello.xml"));
231             InputStream getConfig = checkNotNull(XmlFileLoader.getResourceAsStream("netconfMessages/getConfig.xml"));
232
233             Socket clientSocket = new Socket(netconfAddress.getHostString(), netconfAddress.getPort());
234             DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
235             InputStreamReader inFromServer = new InputStreamReader(clientSocket.getInputStream());
236
237             StringBuffer sb = new StringBuffer();
238             while (sb.toString().endsWith("]]>]]>") == false) {
239                 sb.append((char) inFromServer.read());
240             }
241             logger.info(sb.toString());
242
243             outToServer.write(IOUtils.toByteArray(clientHello));
244             outToServer.write("]]>]]>".getBytes());
245             outToServer.flush();
246             // Thread.sleep(100);
247             outToServer.write(IOUtils.toByteArray(getConfig));
248             outToServer.write("]]>]]>".getBytes());
249             outToServer.flush();
250             Thread.sleep(100);
251             sb = new StringBuffer();
252             while (sb.toString().endsWith("]]>]]>") == false) {
253                 sb.append((char) inFromServer.read());
254             }
255             logger.info(sb.toString());
256             clientSocket.close();
257         }
258     }
259
260     class TestingThread extends Thread {
261
262         private final String clientId;
263         private final int attempts;
264         private Optional<Exception> thrownException;
265
266         TestingThread(String clientId, int attempts) {
267             this.clientId = clientId;
268             this.attempts = attempts;
269             setName("client-" + clientId);
270         }
271
272         @Override
273         public void run() {
274             try {
275                 final NetconfClient netconfClient = new NetconfClient(clientId, netconfAddress, NETCONF_CLIENT_DISPATCHER);
276                 long sessionId = netconfClient.getSessionId();
277                 logger.info("Client with sessionid {} hello exchanged", sessionId);
278
279                 final NetconfMessage getMessage = XmlFileLoader
280                         .xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
281                 NetconfMessage result = netconfClient.sendMessage(getMessage);
282                 logger.info("Client with sessionid {} got result {}", sessionId, result);
283                 netconfClient.close();
284                 logger.info("Client with session id {} ended", sessionId);
285                 thrownException = Optional.absent();
286             } catch (final Exception e) {
287                 thrownException = Optional.of(e);
288             }
289         }
290     }
291 }