CDS: Add stress test RPC to the cars model
[controller.git] / opendaylight / md-sal / sal-netconf-connector / src / test / java / org / opendaylight / controller / sal / connect / netconf / listener / NetconfDeviceCommunicatorTest.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.sal.connect.netconf.listener;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Matchers.eq;
16 import static org.mockito.Matchers.same;
17 import static org.mockito.Mockito.doNothing;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.never;
21 import static org.mockito.Mockito.reset;
22 import static org.mockito.Mockito.spy;
23 import static org.mockito.Mockito.timeout;
24 import static org.mockito.Mockito.verify;
25 import static org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants.RPC_REPLY_KEY;
26 import static org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0;
27
28 import com.google.common.base.Strings;
29 import com.google.common.collect.Sets;
30 import com.google.common.util.concurrent.ListenableFuture;
31 import io.netty.channel.ChannelFuture;
32 import io.netty.channel.EventLoopGroup;
33 import io.netty.channel.nio.NioEventLoopGroup;
34 import io.netty.util.HashedWheelTimer;
35 import io.netty.util.Timer;
36 import io.netty.util.concurrent.Future;
37 import io.netty.util.concurrent.GenericFutureListener;
38 import io.netty.util.concurrent.GlobalEventExecutor;
39 import java.io.ByteArrayInputStream;
40 import java.net.InetSocketAddress;
41 import java.util.Collection;
42 import java.util.Collections;
43 import java.util.UUID;
44 import java.util.concurrent.TimeUnit;
45 import java.util.concurrent.TimeoutException;
46 import javax.xml.parsers.DocumentBuilderFactory;
47 import javax.xml.parsers.ParserConfigurationException;
48 import org.apache.commons.lang3.StringUtils;
49 import org.junit.Before;
50 import org.junit.Test;
51 import org.mockito.ArgumentCaptor;
52 import org.mockito.Mock;
53 import org.mockito.MockitoAnnotations;
54 import org.opendaylight.controller.netconf.api.NetconfMessage;
55 import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
56 import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl;
57 import org.opendaylight.controller.netconf.client.NetconfClientSession;
58 import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
59 import org.opendaylight.controller.netconf.client.conf.NetconfReconnectingClientConfiguration;
60 import org.opendaylight.controller.netconf.client.conf.NetconfReconnectingClientConfigurationBuilder;
61 import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.LoginPassword;
62 import org.opendaylight.controller.sal.connect.api.RemoteDevice;
63 import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
64 import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
65 import org.opendaylight.protocol.framework.ReconnectStrategy;
66 import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
67 import org.opendaylight.protocol.framework.TimedReconnectStrategy;
68 import org.opendaylight.yangtools.yang.common.QName;
69 import org.opendaylight.yangtools.yang.common.RpcError;
70 import org.opendaylight.yangtools.yang.common.RpcResult;
71 import org.w3c.dom.Document;
72 import org.w3c.dom.Element;
73
74 public class NetconfDeviceCommunicatorTest {
75
76     @Mock
77     NetconfClientSession mockSession;
78
79     @Mock
80     RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> mockDevice;
81
82     NetconfDeviceCommunicator communicator;
83
84     @Before
85     public void setUp() throws Exception {
86         MockitoAnnotations.initMocks( this );
87
88         communicator = new NetconfDeviceCommunicator( new RemoteDeviceId( "test", InetSocketAddress.createUnresolved("localhost", 22)), mockDevice);
89     }
90
91     @SuppressWarnings("unchecked")
92     void setupSession() {
93         doReturn(Collections.<String>emptySet()).when(mockSession).getServerCapabilities();
94         doNothing().when(mockDevice).onRemoteSessionUp(any(NetconfSessionPreferences.class),
95                 any(NetconfDeviceCommunicator.class));
96         communicator.onSessionUp(mockSession);
97     }
98
99     private ListenableFuture<RpcResult<NetconfMessage>> sendRequest() throws Exception {
100         return sendRequest( UUID.randomUUID().toString() );
101     }
102
103     @SuppressWarnings("unchecked")
104     private ListenableFuture<RpcResult<NetconfMessage>> sendRequest( String messageID ) throws Exception {
105         Document doc = DocumentBuilderFactory.newInstance().newDocumentBuilder().newDocument();
106         Element element = doc.createElement( "request" );
107         element.setAttribute( "message-id", messageID );
108         doc.appendChild( element );
109         NetconfMessage message = new NetconfMessage( doc );
110
111         ChannelFuture mockChannelFuture = mock( ChannelFuture.class );
112         doReturn( mockChannelFuture ).when( mockChannelFuture )
113             .addListener( any( (GenericFutureListener.class ) ) );
114         doReturn( mockChannelFuture ).when( mockSession ).sendMessage( same( message ) );
115
116         ListenableFuture<RpcResult<NetconfMessage>> resultFuture =
117                                       communicator.sendRequest( message, QName.create( "mock rpc" ) );
118
119         assertNotNull( "ListenableFuture is null", resultFuture );
120         return resultFuture;
121     }
122
123     @Test
124     public void testOnSessionUp() {
125         String testCapability = "urn:opendaylight:params:xml:ns:test?module=test-module&revision=2014-06-02";
126         Collection<String> serverCapabilities =
127                 Sets.newHashSet( NetconfMessageTransformUtil.NETCONF_ROLLBACK_ON_ERROR_URI.toString(),
128                                  NetconfMessageTransformUtil.IETF_NETCONF_MONITORING.getNamespace().toString(),
129                                  testCapability );
130         doReturn( serverCapabilities ).when( mockSession ).getServerCapabilities();
131
132         ArgumentCaptor<NetconfSessionPreferences> NetconfSessionPreferences =
133                                               ArgumentCaptor.forClass( NetconfSessionPreferences.class );
134         doNothing().when( mockDevice ).onRemoteSessionUp( NetconfSessionPreferences.capture(), eq( communicator ) );
135
136         communicator.onSessionUp( mockSession );
137
138         verify( mockSession ).getServerCapabilities();
139         verify( mockDevice ).onRemoteSessionUp( NetconfSessionPreferences.capture(), eq( communicator ) );
140
141         NetconfSessionPreferences actualCapabilites = NetconfSessionPreferences.getValue();
142         assertEquals( "containsModuleCapability", true, actualCapabilites.containsNonModuleCapability(
143                 NetconfMessageTransformUtil.NETCONF_ROLLBACK_ON_ERROR_URI.toString()) );
144         assertEquals( "containsModuleCapability", false, actualCapabilites.containsNonModuleCapability(testCapability) );
145         assertEquals( "getModuleBasedCaps", Sets.newHashSet(
146                             QName.create( "urn:opendaylight:params:xml:ns:test", "2014-06-02", "test-module" )),
147                       actualCapabilites.getModuleBasedCaps() );
148         assertEquals( "isRollbackSupported", true, actualCapabilites.isRollbackSupported() );
149         assertEquals( "isMonitoringSupported", true, actualCapabilites.isMonitoringSupported() );
150     }
151
152     @SuppressWarnings("unchecked")
153     @Test(timeout=5000)
154     public void testOnSessionDown() throws Exception {
155         setupSession();
156
157         ListenableFuture<RpcResult<NetconfMessage>> resultFuture1 = sendRequest();
158         ListenableFuture<RpcResult<NetconfMessage>> resultFuture2 = sendRequest();
159
160         doNothing().when( mockDevice ).onRemoteSessionDown();
161
162         communicator.onSessionDown( mockSession, new Exception( "mock ex" ) );
163
164         verifyErrorRpcResult( resultFuture1.get(), RpcError.ErrorType.TRANSPORT, "operation-failed" );
165         verifyErrorRpcResult( resultFuture2.get(), RpcError.ErrorType.TRANSPORT, "operation-failed" );
166
167         verify( mockDevice ).onRemoteSessionDown();
168
169         reset( mockDevice );
170
171         communicator.onSessionDown( mockSession, new Exception( "mock ex" ) );
172
173         verify( mockDevice, never() ).onRemoteSessionDown();
174     }
175
176     @Test
177     public void testOnSessionTerminated() throws Exception {
178         setupSession();
179
180         ListenableFuture<RpcResult<NetconfMessage>> resultFuture = sendRequest();
181
182         doNothing().when( mockDevice ).onRemoteSessionDown();
183
184         String reasonText = "testing terminate";
185         NetconfTerminationReason reason = new NetconfTerminationReason( reasonText );
186         communicator.onSessionTerminated( mockSession, reason );
187
188         RpcError rpcError = verifyErrorRpcResult( resultFuture.get(), RpcError.ErrorType.TRANSPORT,
189                                                   "operation-failed" );
190         assertEquals( "RpcError message", reasonText, rpcError.getMessage() );
191
192         verify( mockDevice ).onRemoteSessionDown();
193     }
194
195     @Test
196     public void testClose() throws Exception {
197         communicator.close();
198         verify( mockDevice, never() ).onRemoteSessionDown();
199     }
200
201     @SuppressWarnings({ "rawtypes", "unchecked" })
202     @Test
203     public void testSendRequest() throws Exception {
204         setupSession();
205
206         NetconfMessage message = new NetconfMessage(
207                               DocumentBuilderFactory.newInstance().newDocumentBuilder().newDocument() );
208         QName rpc = QName.create( "mock rpc" );
209
210         ArgumentCaptor<GenericFutureListener> futureListener =
211                                             ArgumentCaptor.forClass( GenericFutureListener.class );
212
213         ChannelFuture mockChannelFuture = mock( ChannelFuture.class );
214         doReturn( mockChannelFuture ).when( mockChannelFuture ).addListener( futureListener.capture() );
215         doReturn( mockChannelFuture ).when( mockSession ).sendMessage( same( message ) );
216
217         ListenableFuture<RpcResult<NetconfMessage>> resultFuture = communicator.sendRequest( message, rpc );
218
219         verify( mockSession ).sendMessage( same( message ) );
220
221         assertNotNull( "ListenableFuture is null", resultFuture );
222
223         verify( mockChannelFuture ).addListener( futureListener.capture() );
224         Future<Void> operationFuture = mock( Future.class );
225         doReturn( true ).when( operationFuture ).isSuccess();
226         doReturn( true ).when( operationFuture ).isDone();
227         futureListener.getValue().operationComplete( operationFuture );
228
229         try {
230             resultFuture.get( 1, TimeUnit.MILLISECONDS ); // verify it's not cancelled or has an error set
231         }
232         catch( TimeoutException e ) {} // expected
233     }
234
235     @Test
236     public void testSendRequestWithNoSession() throws Exception {
237         NetconfMessage message = new NetconfMessage(
238                               DocumentBuilderFactory.newInstance().newDocumentBuilder().newDocument() );
239         QName rpc = QName.create( "mock rpc" );
240
241         ListenableFuture<RpcResult<NetconfMessage>> resultFuture = communicator.sendRequest( message, rpc );
242
243         assertNotNull( "ListenableFuture is null", resultFuture );
244
245         // Should have an immediate result
246         RpcResult<NetconfMessage> rpcResult = resultFuture.get( 3, TimeUnit.MILLISECONDS );
247
248         verifyErrorRpcResult( rpcResult, RpcError.ErrorType.TRANSPORT, "operation-failed" );
249     }
250
251     @SuppressWarnings({ "rawtypes", "unchecked" })
252     @Test
253     public void testSendRequestWithWithSendFailure() throws Exception {
254         setupSession();
255
256         NetconfMessage message = new NetconfMessage(
257                               DocumentBuilderFactory.newInstance().newDocumentBuilder().newDocument() );
258         QName rpc = QName.create( "mock rpc" );
259
260         ArgumentCaptor<GenericFutureListener> futureListener =
261                                             ArgumentCaptor.forClass( GenericFutureListener.class );
262
263         ChannelFuture mockChannelFuture = mock( ChannelFuture.class );
264         doReturn( mockChannelFuture ).when( mockChannelFuture ).addListener( futureListener.capture() );
265         doReturn( mockChannelFuture ).when( mockSession ).sendMessage( same( message ) );
266
267         ListenableFuture<RpcResult<NetconfMessage>> resultFuture = communicator.sendRequest( message, rpc );
268
269         assertNotNull( "ListenableFuture is null", resultFuture );
270
271         verify( mockChannelFuture ).addListener( futureListener.capture() );
272
273         Future<Void> operationFuture = mock( Future.class );
274         doReturn( false ).when( operationFuture ).isSuccess();
275         doReturn( true ).when( operationFuture ).isDone();
276         doReturn( new Exception( "mock error" ) ).when( operationFuture ).cause();
277         futureListener.getValue().operationComplete( operationFuture );
278
279         // Should have an immediate result
280         RpcResult<NetconfMessage> rpcResult = resultFuture.get( 3, TimeUnit.MILLISECONDS );
281
282         RpcError rpcError = verifyErrorRpcResult( rpcResult, RpcError.ErrorType.TRANSPORT, "operation-failed" );
283         assertEquals( "RpcError message contains \"mock error\"", true,
284                     rpcError.getMessage().contains( "mock error" ) );
285     }
286
287     private NetconfMessage createSuccessResponseMessage( String messageID ) throws ParserConfigurationException {
288         Document doc = DocumentBuilderFactory.newInstance().newDocumentBuilder().newDocument();
289         Element rpcReply = doc.createElementNS( URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0, RPC_REPLY_KEY );
290         rpcReply.setAttribute( "message-id", messageID );
291         Element element = doc.createElementNS( "ns", "data" );
292         element.setTextContent( messageID );
293         rpcReply.appendChild( element );
294         doc.appendChild( rpcReply );
295
296         return new NetconfMessage( doc );
297     }
298
299     //Test scenario verifying whether missing message is handled
300     @Test
301     public void testOnMissingResponseMessage() throws Exception {
302
303         setupSession();
304
305         String messageID1 = UUID.randomUUID().toString();
306         ListenableFuture<RpcResult<NetconfMessage>> resultFuture1 = sendRequest( messageID1 );
307
308         String messageID2 = UUID.randomUUID().toString();
309         ListenableFuture<RpcResult<NetconfMessage>> resultFuture2 = sendRequest( messageID2 );
310
311         String messageID3 = UUID.randomUUID().toString();
312         ListenableFuture<RpcResult<NetconfMessage>> resultFuture3 = sendRequest( messageID3 );
313
314         //response messages 1,2 are omitted
315         communicator.onMessage( mockSession, createSuccessResponseMessage( messageID3 ) );
316
317         verifyResponseMessage( resultFuture3.get(), messageID3 );
318     }
319
320     @Test
321     public void testOnSuccessfulResponseMessage() throws Exception {
322         setupSession();
323
324         String messageID1 = UUID.randomUUID().toString();
325         ListenableFuture<RpcResult<NetconfMessage>> resultFuture1 = sendRequest( messageID1 );
326
327         String messageID2 = UUID.randomUUID().toString();
328         ListenableFuture<RpcResult<NetconfMessage>> resultFuture2 = sendRequest( messageID2 );
329
330         communicator.onMessage( mockSession, createSuccessResponseMessage( messageID1 ) );
331         communicator.onMessage( mockSession, createSuccessResponseMessage( messageID2 ) );
332
333         verifyResponseMessage( resultFuture1.get(), messageID1 );
334         verifyResponseMessage( resultFuture2.get(), messageID2 );
335     }
336
337     @Test
338     public void testOnResponseMessageWithError() throws Exception {
339         setupSession();
340
341         String messageID = UUID.randomUUID().toString();
342         ListenableFuture<RpcResult<NetconfMessage>> resultFuture = sendRequest( messageID );
343
344         communicator.onMessage( mockSession, createErrorResponseMessage( messageID ) );
345
346         RpcError rpcError = verifyErrorRpcResult( resultFuture.get(), RpcError.ErrorType.RPC,
347                                                   "missing-attribute" );
348         assertEquals( "RpcError message", "Missing attribute", rpcError.getMessage() );
349
350         String errorInfo = rpcError.getInfo();
351         assertNotNull( "RpcError info is null", errorInfo );
352         assertEquals( "Error info contains \"foo\"", true,
353                       errorInfo.contains( "<bad-attribute>foo</bad-attribute>" ) );
354         assertEquals( "Error info contains \"bar\"", true,
355                       errorInfo.contains( "<bad-element>bar</bad-element>" ) );
356     }
357
358     /**
359      * Test whether reconnect is scheduled properly
360      */
361     @Test
362     public void testNetconfDeviceReconnectInCommunicator() throws Exception {
363         final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> device = mock(RemoteDevice.class);
364
365         final TimedReconnectStrategy timedReconnectStrategy = new TimedReconnectStrategy(GlobalEventExecutor.INSTANCE, 10000, 0, 1.0, null, 100L, null);
366         final ReconnectStrategy reconnectStrategy = spy(new ReconnectStrategy() {
367             @Override
368             public int getConnectTimeout() throws Exception {
369                 return timedReconnectStrategy.getConnectTimeout();
370             }
371
372             @Override
373             public Future<Void> scheduleReconnect(final Throwable cause) {
374                 return timedReconnectStrategy.scheduleReconnect(cause);
375             }
376
377             @Override
378             public void reconnectSuccessful() {
379                 timedReconnectStrategy.reconnectSuccessful();
380             }
381         });
382
383         final EventLoopGroup group = new NioEventLoopGroup();
384         final Timer time = new HashedWheelTimer();
385         try {
386             final NetconfDeviceCommunicator listener = new NetconfDeviceCommunicator(new RemoteDeviceId("test", InetSocketAddress.createUnresolved("localhost", 22)), device);
387             final NetconfReconnectingClientConfiguration cfg = NetconfReconnectingClientConfigurationBuilder.create()
388                     .withAddress(new InetSocketAddress("localhost", 65000))
389                     .withReconnectStrategy(reconnectStrategy)
390                     .withConnectStrategyFactory(new ReconnectStrategyFactory() {
391                         @Override
392                         public ReconnectStrategy createReconnectStrategy() {
393                             return reconnectStrategy;
394                         }
395                     })
396                     .withAuthHandler(new LoginPassword("admin", "admin"))
397                     .withConnectionTimeoutMillis(10000)
398                     .withProtocol(NetconfClientConfiguration.NetconfClientProtocol.SSH)
399                     .withSessionListener(listener)
400                     .build();
401
402             listener.initializeRemoteConnection(new NetconfClientDispatcherImpl(group, group, time), cfg);
403
404             verify(reconnectStrategy, timeout((int) TimeUnit.MINUTES.toMillis(3)).times(101)).scheduleReconnect(any(Throwable.class));
405         } finally {
406             time.stop();
407             group.shutdownGracefully();
408         }
409     }
410
411     @Test
412     public void testOnResponseMessageWithWrongMessageID() throws Exception {
413         setupSession();
414
415         String messageID = UUID.randomUUID().toString();
416         ListenableFuture<RpcResult<NetconfMessage>> resultFuture = sendRequest( messageID );
417
418         communicator.onMessage( mockSession, createSuccessResponseMessage( UUID.randomUUID().toString() ) );
419
420         RpcError rpcError = verifyErrorRpcResult( resultFuture.get(), RpcError.ErrorType.PROTOCOL,
421                                                   "bad-attribute" );
422         assertEquals( "RpcError message non-empty", true,
423                       !Strings.isNullOrEmpty( rpcError.getMessage() ) );
424
425         String errorInfo = rpcError.getInfo();
426         assertNotNull( "RpcError info is null", errorInfo );
427         assertEquals( "Error info contains \"actual-message-id\"", true,
428                       errorInfo.contains( "actual-message-id" ) );
429         assertEquals( "Error info contains \"expected-message-id\"", true,
430                       errorInfo.contains( "expected-message-id" ) );
431     }
432
433     private NetconfMessage createErrorResponseMessage( String messageID ) throws Exception {
434         String xmlStr =
435             "<rpc-reply xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\"" +
436             "           message-id=\"" + messageID + "\">" +
437             "  <rpc-error>" +
438             "    <error-type>rpc</error-type>" +
439             "    <error-tag>missing-attribute</error-tag>" +
440             "    <error-severity>error</error-severity>" +
441             "    <error-message>Missing attribute</error-message>" +
442             "    <error-info>" +
443             "      <bad-attribute>foo</bad-attribute>" +
444             "      <bad-element>bar</bad-element>" +
445             "    </error-info>" +
446             "  </rpc-error>" +
447             "</rpc-reply>";
448
449         ByteArrayInputStream bis = new ByteArrayInputStream( xmlStr.getBytes() );
450         Document doc = DocumentBuilderFactory.newInstance().newDocumentBuilder().parse( bis );
451         return new NetconfMessage( doc );
452     }
453
454     private void verifyResponseMessage( RpcResult<NetconfMessage> rpcResult, String dataText ) {
455         assertNotNull( "RpcResult is null", rpcResult );
456         assertEquals( "isSuccessful", true, rpcResult.isSuccessful() );
457         NetconfMessage messageResult = rpcResult.getResult();
458         assertNotNull( "getResult", messageResult );
459 //        List<SimpleNode<?>> nodes = messageResult.getSimpleNodesByName(
460 //                                         QName.create( URI.create( "ns" ), null, "data" ) );
461 //        assertNotNull( "getSimpleNodesByName", nodes );
462 //        assertEquals( "List<SimpleNode<?>> size", 1, nodes.size() );
463 //        assertEquals( "SimpleNode value", dataText, nodes.iterator().next().getValue() );
464     }
465
466     private RpcError verifyErrorRpcResult( RpcResult<NetconfMessage> rpcResult,
467                                            RpcError.ErrorType expErrorType, String expErrorTag ) {
468         assertNotNull( "RpcResult is null", rpcResult );
469         assertEquals( "isSuccessful", false, rpcResult.isSuccessful() );
470         assertNotNull( "RpcResult errors is null", rpcResult.getErrors() );
471         assertEquals( "Errors size", 1, rpcResult.getErrors().size() );
472         RpcError rpcError = rpcResult.getErrors().iterator().next();
473         assertEquals( "getErrorSeverity", RpcError.ErrorSeverity.ERROR, rpcError.getSeverity() );
474         assertEquals( "getErrorType", expErrorType, rpcError.getErrorType() );
475         assertEquals( "getErrorTag", expErrorTag, rpcError.getTag() );
476         assertTrue( "getMessage is empty", StringUtils.isNotEmpty( rpcError.getMessage() ) );
477         return rpcError;
478     }
479 }