Initial code drop of netconf protocol implementation
[controller.git] / opendaylight / netconf / netconf-impl / src / test / java / org / opendaylight / controller / netconf / impl / ConcurrentClientsTest.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.impl;
10
11 import com.google.common.base.Optional;
12 import com.google.common.collect.Sets;
13 import io.netty.channel.ChannelFuture;
14 import io.netty.util.HashedWheelTimer;
15 import org.apache.commons.io.IOUtils;
16 import org.junit.After;
17 import org.junit.Before;
18 import org.junit.Test;
19 import org.mockito.Mock;
20 import org.mockito.MockitoAnnotations;
21 import org.opendaylight.controller.config.util.ConfigRegistryJMXClient;
22 import org.opendaylight.controller.config.util.ConfigTransactionJMXClient;
23 import org.opendaylight.controller.config.yang.store.api.YangStoreService;
24 import org.opendaylight.controller.config.yang.store.api.YangStoreSnapshot;
25 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
26 import org.opendaylight.controller.netconf.api.NetconfMessage;
27 import org.opendaylight.controller.netconf.api.NetconfOperationRouter;
28 import org.opendaylight.controller.netconf.client.NetconfClient;
29 import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
30 import org.opendaylight.controller.netconf.mapping.api.*;
31 import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
32 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35 import org.w3c.dom.Document;
36
37 import javax.management.ObjectName;
38 import javax.net.ssl.SSLContext;
39 import java.io.DataOutputStream;
40 import java.io.InputStream;
41 import java.io.InputStreamReader;
42 import java.lang.management.ManagementFactory;
43 import java.net.InetSocketAddress;
44 import java.net.Socket;
45 import java.util.ArrayList;
46 import java.util.Collections;
47 import java.util.List;
48 import java.util.Set;
49 import java.util.concurrent.TimeUnit;
50
51 import static com.google.common.base.Preconditions.checkNotNull;
52 import static org.junit.Assert.assertTrue;
53 import static org.mockito.Matchers.any;
54 import static org.mockito.Mockito.doReturn;
55 import static org.mockito.Mockito.mock;
56
57 public class ConcurrentClientsTest {
58
59     private static final int CONCURRENCY = 16;
60     @Mock
61     private YangStoreService yangStoreService;
62     @Mock
63     private ConfigRegistryJMXClient jmxClient;
64
65     private final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303);
66
67     static final Logger logger = LoggerFactory.getLogger(ConcurrentClientsTest.class);
68
69     private DefaultCommitNotificationProducer commitNot;
70     private NetconfServerDispatcher dispatch;
71
72     @Before
73     public void setUp() throws Exception {
74         { // init mocks
75             MockitoAnnotations.initMocks(this);
76             final YangStoreSnapshot yStore = mock(YangStoreSnapshot.class);
77             doReturn(yStore).when(this.yangStoreService).getYangStoreSnapshot();
78             doReturn(Collections.emptyMap()).when(yStore).getModuleMXBeanEntryMap();
79             doReturn(Collections.emptyMap()).when(yStore).getModuleMap();
80
81             final ConfigTransactionJMXClient mockedTCl = mock(ConfigTransactionJMXClient.class);
82             doReturn(mockedTCl).when(this.jmxClient).getConfigTransactionClient(any(ObjectName.class));
83
84             doReturn(Collections.emptySet()).when(jmxClient).lookupConfigBeans();
85         }
86
87         NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
88         factoriesListener.onAddNetconfOperationServiceFactory(mockOpF());
89
90         SessionIdProvider idProvider = new SessionIdProvider();
91         NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory(
92                 new HashedWheelTimer(5000, TimeUnit.MILLISECONDS), factoriesListener, idProvider);
93
94         commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
95
96         NetconfServerSessionListenerFactory listenerFactory = new NetconfServerSessionListenerFactory(
97                 factoriesListener, commitNot, idProvider);
98         dispatch = new NetconfServerDispatcher(Optional.<SSLContext> absent(), serverNegotiatorFactory, listenerFactory);
99
100         ChannelFuture s = dispatch.createServer(netconfAddress);
101         s.await();
102     }
103
104     private NetconfOperationServiceFactory mockOpF() {
105         return new NetconfOperationServiceFactory() {
106             @Override
107             public NetconfOperationService createService(long netconfSessionId, String netconfSessionIdForReporting) {
108                 return new NetconfOperationService() {
109                     @Override
110                     public Set<Capability> getCapabilities() {
111                         return Collections.emptySet();
112                     }
113
114                     @Override
115                     public Set<NetconfOperation> getNetconfOperations() {
116                         return Sets.<NetconfOperation> newHashSet(new NetconfOperation() {
117                             @Override
118                             public HandlingPriority canHandle(Document message) {
119                                 return HandlingPriority.getHandlingPriority(Integer.MAX_VALUE);
120                             }
121
122                             @Override
123                             public Document handle(Document message, NetconfOperationRouter operationRouter)
124                                     throws NetconfDocumentedException {
125                                 try {
126                                     return XmlUtil.readXmlToDocument("<test/>");
127                                 } catch (Exception e) {
128                                     throw new RuntimeException(e);
129                                 }
130                             }
131                         });
132                     }
133
134                     @Override
135                     public Set<NetconfOperationFilter> getFilters() {
136                         return Collections.emptySet();
137                     }
138
139                     @Override
140                     public void close() {
141                     }
142                 };
143             }
144         };
145     }
146
147     @After
148     public void cleanUp() throws Exception {
149         commitNot.close();
150         dispatch.close();
151     }
152
153     @Test
154     public void multipleClients() throws Exception {
155         List<TestingThread> threads = new ArrayList<>();
156
157         final int attempts = 5;
158         for (int i = 0; i < CONCURRENCY; i++) {
159             TestingThread thread = new TestingThread(String.valueOf(i), attempts);
160             threads.add(thread);
161             thread.start();
162         }
163
164         for (TestingThread thread : threads) {
165             thread.join();
166             assertTrue(thread.success);
167         }
168     }
169
170     @Test
171     public void synchronizationTest() throws Exception {
172         new BlockingThread("foo").run2();
173     }
174
175     @Test
176     public void multipleBlockingClients() throws Exception {
177         List<BlockingThread> threads = new ArrayList<>();
178         for (int i = 0; i < CONCURRENCY; i++) {
179             BlockingThread thread = new BlockingThread(String.valueOf(i));
180             threads.add(thread);
181             thread.start();
182         }
183
184         for (BlockingThread thread : threads) {
185             thread.join();
186             assertTrue(thread.success);
187         }
188     }
189
190     class BlockingThread extends Thread {
191         Boolean success;
192
193         public BlockingThread(String name) {
194             super("client-" + name);
195         }
196
197         @Override
198         public void run() {
199             try {
200                 run2();
201                 success = true;
202             } catch (Exception e) {
203                 success = false;
204                 throw new RuntimeException(e);
205             }
206         }
207
208         private void run2() throws Exception {
209             InputStream clientHello = checkNotNull(XmlFileLoader
210                     .getResourceAsStream("netconfMessages/client_hello.xml"));
211             InputStream getConfig = checkNotNull(XmlFileLoader.getResourceAsStream("netconfMessages/getConfig.xml"));
212
213             Socket clientSocket = new Socket(netconfAddress.getHostString(), netconfAddress.getPort());
214             DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
215             InputStreamReader inFromServer = new InputStreamReader(clientSocket.getInputStream());
216
217             StringBuffer sb = new StringBuffer();
218             while (sb.toString().endsWith("]]>]]>") == false) {
219                 sb.append((char) inFromServer.read());
220             }
221             logger.info(sb.toString());
222
223             outToServer.write(IOUtils.toByteArray(clientHello));
224             outToServer.write("]]>]]>".getBytes());
225             outToServer.flush();
226             // Thread.sleep(100);
227             outToServer.write(IOUtils.toByteArray(getConfig));
228             outToServer.write("]]>]]>".getBytes());
229             outToServer.flush();
230             Thread.sleep(100);
231             sb = new StringBuffer();
232             while (sb.toString().endsWith("]]>]]>") == false) {
233                 sb.append((char) inFromServer.read());
234             }
235             logger.info(sb.toString());
236             clientSocket.close();
237         }
238     }
239
240     class TestingThread extends Thread {
241
242         private final String clientId;
243         private final int attempts;
244         private Boolean success;
245
246         TestingThread(String clientId, int attempts) {
247             this.clientId = clientId;
248             this.attempts = attempts;
249             setName("client-" + clientId);
250         }
251
252         @Override
253         public void run() {
254             try {
255                 final NetconfClient netconfClient = new NetconfClient(clientId, netconfAddress);
256                 long sessionId = netconfClient.getSessionId();
257                 logger.info("Client with sessionid {} hello exchanged", sessionId);
258
259                 final NetconfMessage getMessage = XmlFileLoader
260                         .xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
261                 NetconfMessage result = netconfClient.sendMessage(getMessage);
262                 logger.info("Client with sessionid {} got result {}", sessionId, result);
263                 netconfClient.close();
264                 logger.info("Client with session id {} ended", sessionId);
265                 success = true;
266             } catch (final Exception e) {
267                 success = false;
268                 throw new RuntimeException(e);
269             }
270         }
271     }
272 }