e504c679ba5b3d0e04eba433f193f89208b1a91b
[netconf.git] / netconf / netconf-netty-util / src / test / java / org / opendaylight / netconf / nettyutil / handler / ssh / client / AsyncSshHandlerTest.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.nettyutil.handler.ssh.client;
9
10 import static org.mockito.ArgumentMatchers.any;
11 import static org.mockito.ArgumentMatchers.anyBoolean;
12 import static org.mockito.ArgumentMatchers.eq;
13 import static org.mockito.Mockito.doAnswer;
14 import static org.mockito.Mockito.doNothing;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.doThrow;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.spy;
19 import static org.mockito.Mockito.times;
20 import static org.mockito.Mockito.verify;
21 import static org.mockito.Mockito.verifyNoMoreInteractions;
22
23 import com.google.common.util.concurrent.FutureCallback;
24 import com.google.common.util.concurrent.Futures;
25 import com.google.common.util.concurrent.ListenableFuture;
26 import com.google.common.util.concurrent.MoreExecutors;
27 import com.google.common.util.concurrent.SettableFuture;
28 import io.netty.buffer.Unpooled;
29 import io.netty.channel.Channel;
30 import io.netty.channel.ChannelConfig;
31 import io.netty.channel.ChannelFuture;
32 import io.netty.channel.ChannelHandlerContext;
33 import io.netty.channel.ChannelPromise;
34 import io.netty.channel.DefaultChannelPromise;
35 import io.netty.util.concurrent.EventExecutor;
36 import java.io.IOException;
37 import java.net.SocketAddress;
38 import java.util.concurrent.TimeUnit;
39 import org.junit.After;
40 import org.junit.Before;
41 import org.junit.Ignore;
42 import org.junit.Test;
43 import org.junit.runner.RunWith;
44 import org.mockito.Mock;
45 import org.mockito.junit.MockitoJUnitRunner;
46 import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
47 import org.opendaylight.netconf.shaded.sshd.client.channel.ClientChannel;
48 import org.opendaylight.netconf.shaded.sshd.client.future.AuthFuture;
49 import org.opendaylight.netconf.shaded.sshd.client.future.ConnectFuture;
50 import org.opendaylight.netconf.shaded.sshd.client.future.OpenFuture;
51 import org.opendaylight.netconf.shaded.sshd.client.session.ClientSession;
52 import org.opendaylight.netconf.shaded.sshd.common.future.CloseFuture;
53 import org.opendaylight.netconf.shaded.sshd.common.future.SshFuture;
54 import org.opendaylight.netconf.shaded.sshd.common.future.SshFutureListener;
55 import org.opendaylight.netconf.shaded.sshd.common.io.IoInputStream;
56 import org.opendaylight.netconf.shaded.sshd.common.io.IoOutputStream;
57 import org.opendaylight.netconf.shaded.sshd.common.io.IoReadFuture;
58 import org.opendaylight.netconf.shaded.sshd.common.io.IoWriteFuture;
59 import org.opendaylight.netconf.shaded.sshd.common.io.WritePendingException;
60 import org.opendaylight.netconf.shaded.sshd.common.util.buffer.Buffer;
61 import org.opendaylight.netconf.shaded.sshd.common.util.buffer.ByteArrayBuffer;
62
63 @RunWith(MockitoJUnitRunner.StrictStubs.class)
64 public class AsyncSshHandlerTest {
65
66     @Mock
67     private NetconfSshClient sshClient;
68     @Mock
69     private AuthenticationHandler authHandler;
70     @Mock
71     private ChannelHandlerContext ctx;
72     @Mock
73     private Channel channel;
74     @Mock
75     private SocketAddress remoteAddress;
76     @Mock
77     private SocketAddress localAddress;
78     @Mock
79     private ChannelConfig channelConfig;
80     @Mock
81     private EventExecutor executor;
82
83     private AsyncSshHandler asyncSshHandler;
84
85     private SshFutureListener<ConnectFuture> sshConnectListener;
86     private SshFutureListener<AuthFuture> sshAuthListener;
87     private SshFutureListener<OpenFuture> sshChannelOpenListener;
88     private ChannelPromise promise;
89
90     @Before
91     public void setUp() throws Exception {
92         stubAuth();
93         stubSshClient();
94         stubChannel();
95         stubCtx();
96
97         promise = getMockedPromise();
98
99         asyncSshHandler = new AsyncSshHandler(authHandler, sshClient);
100     }
101
102     @After
103     public void tearDown() throws Exception {
104         sshConnectListener = null;
105         sshAuthListener = null;
106         sshChannelOpenListener = null;
107         promise = null;
108         asyncSshHandler.close(ctx, getMockedPromise());
109     }
110
111     private void stubAuth() throws IOException {
112         doReturn("usr").when(authHandler).getUsername();
113
114         final AuthFuture authFuture = mock(AuthFuture.class);
115         Futures.addCallback(stubAddListener(authFuture), new SuccessFutureListener<AuthFuture>() {
116             @Override
117             public void onSuccess(final SshFutureListener<AuthFuture> result) {
118                 sshAuthListener = result;
119             }
120         }, MoreExecutors.directExecutor());
121         doReturn(authFuture).when(authHandler).authenticate(any(ClientSession.class));
122     }
123
124     @SuppressWarnings("unchecked")
125     private static <T extends SshFuture<T>> ListenableFuture<SshFutureListener<T>> stubAddListener(final T future) {
126         final SettableFuture<SshFutureListener<T>> listenerSettableFuture = SettableFuture.create();
127
128         doAnswer(invocation -> {
129             listenerSettableFuture.set((SshFutureListener<T>) invocation.getArguments()[0]);
130             return null;
131         }).when(future).addListener(any(SshFutureListener.class));
132
133         return listenerSettableFuture;
134     }
135
136     private void stubCtx() {
137         doReturn(channel).when(ctx).channel();
138         doReturn(ctx).when(ctx).fireChannelActive();
139         doReturn(ctx).when(ctx).fireChannelInactive();
140         doReturn(mock(ChannelFuture.class)).when(ctx).disconnect(any(ChannelPromise.class));
141         doReturn(getMockedPromise()).when(ctx).newPromise();
142         doReturn(executor).when(ctx).executor();
143         doAnswer(invocation -> {
144             invocation.getArgument(0, Runnable.class).run();
145             return null;
146         }).when(executor).execute(any());
147     }
148
149     private void stubChannel() {
150         doReturn("channel").when(channel).toString();
151     }
152
153     private void stubSshClient() throws IOException {
154         final ConnectFuture connectFuture = mock(ConnectFuture.class);
155         Futures.addCallback(stubAddListener(connectFuture), new SuccessFutureListener<ConnectFuture>() {
156             @Override
157             public void onSuccess(final SshFutureListener<ConnectFuture> result) {
158                 sshConnectListener = result;
159             }
160         }, MoreExecutors.directExecutor());
161         doReturn(connectFuture).when(sshClient).connect("usr", remoteAddress);
162         doReturn(channelConfig).when(channel).config();
163         doReturn(1).when(channelConfig).getConnectTimeoutMillis();
164         doReturn(connectFuture).when(connectFuture).verify(1,TimeUnit.MILLISECONDS);
165     }
166
167     @Test
168     public void testConnectSuccess() throws Exception {
169         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
170
171         final IoInputStream asyncOut = getMockedIoInputStream();
172         final IoOutputStream asyncIn = getMockedIoOutputStream();
173         final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
174         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
175         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
176
177         sshConnectListener.operationComplete(connectFuture);
178         sshAuthListener.operationComplete(getSuccessAuthFuture());
179         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
180
181         verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
182
183         verify(promise).setSuccess();
184         verify(ctx).fireChannelActive();
185         asyncSshHandler.close(ctx, getMockedPromise());
186         verify(ctx).fireChannelInactive();
187     }
188
189     @Test
190     public void testWrite() throws Exception {
191         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
192
193         final IoInputStream asyncOut = getMockedIoInputStream();
194         final IoOutputStream asyncIn = getMockedIoOutputStream();
195         final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
196         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
197         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
198
199         sshConnectListener.operationComplete(connectFuture);
200         sshAuthListener.operationComplete(getSuccessAuthFuture());
201         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
202
203         final ChannelPromise writePromise = getMockedPromise();
204         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), writePromise);
205
206         verify(writePromise).setSuccess();
207     }
208
209     @Test
210     public void testWriteClosed() throws Exception {
211         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
212
213         final IoInputStream asyncOut = getMockedIoInputStream();
214         final IoOutputStream asyncIn = getMockedIoOutputStream();
215
216         final IoWriteFuture ioWriteFuture = asyncIn.writeBuffer(new ByteArrayBuffer());
217
218         Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
219             @Override
220             public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
221                 doReturn(new IllegalStateException()).when(ioWriteFuture).getException();
222                 result.operationComplete(ioWriteFuture);
223             }
224         }, MoreExecutors.directExecutor());
225
226         final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
227         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
228         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
229
230         sshConnectListener.operationComplete(connectFuture);
231         sshAuthListener.operationComplete(getSuccessAuthFuture());
232         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
233
234         final ChannelPromise writePromise = getMockedPromise();
235         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), writePromise);
236
237         verify(writePromise).setFailure(any(Throwable.class));
238     }
239
240     @Test
241     public void testWritePendingOne() throws Exception {
242         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
243
244         final IoInputStream asyncOut = getMockedIoInputStream();
245         final IoOutputStream asyncIn = getMockedIoOutputStream();
246         final IoWriteFuture ioWriteFuture = asyncIn.writeBuffer(new ByteArrayBuffer());
247
248         final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
249         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
250         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
251
252         sshConnectListener.operationComplete(connectFuture);
253         sshAuthListener.operationComplete(getSuccessAuthFuture());
254         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
255
256         final ChannelPromise firstWritePromise = getMockedPromise();
257
258         // intercept listener for first write,
259         // so we can invoke successful write later thus simulate pending of the first write
260         final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture =
261                 stubAddListener(ioWriteFuture);
262         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
263         final SshFutureListener<IoWriteFuture> firstWriteListener = firstWriteListenerFuture.get();
264         // intercept second listener,
265         // this is the listener for pending write for the pending write to know when pending state ended
266         final ListenableFuture<SshFutureListener<IoWriteFuture>> pendingListener = stubAddListener(ioWriteFuture);
267
268         final ChannelPromise secondWritePromise = getMockedPromise();
269         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
270
271         doReturn(ioWriteFuture).when(asyncIn).writeBuffer(any(Buffer.class));
272
273         verifyNoMoreInteractions(firstWritePromise, secondWritePromise);
274
275         // make first write stop pending
276         firstWriteListener.operationComplete(ioWriteFuture);
277
278         // notify listener for second write that pending has ended
279         pendingListener.get().operationComplete(ioWriteFuture);
280
281         // verify both write promises successful
282         verify(firstWritePromise).setSuccess();
283         verify(secondWritePromise).setSuccess();
284     }
285
286     @Ignore("Pending queue is not limited")
287     @Test
288     public void testWritePendingMax() throws Exception {
289         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
290
291         final IoInputStream asyncOut = getMockedIoInputStream();
292         final IoOutputStream asyncIn = getMockedIoOutputStream();
293         final IoWriteFuture ioWriteFuture = asyncIn.writeBuffer(new ByteArrayBuffer());
294
295         final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
296         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
297         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
298
299         sshConnectListener.operationComplete(connectFuture);
300         sshAuthListener.operationComplete(getSuccessAuthFuture());
301         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
302
303         final ChannelPromise firstWritePromise = getMockedPromise();
304
305         // intercept listener for first write,
306         // so we can invoke successful write later thus simulate pending of the first write
307         final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture =
308                 stubAddListener(ioWriteFuture);
309         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
310
311         final ChannelPromise secondWritePromise = getMockedPromise();
312         // now make write throw pending exception
313         doThrow(WritePendingException.class).when(asyncIn).writeBuffer(any(Buffer.class));
314         for (int i = 0; i < 1001; i++) {
315             asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
316         }
317
318         verify(secondWritePromise, times(1)).setFailure(any(Throwable.class));
319     }
320
321     @Test
322     public void testDisconnect() throws Exception {
323         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
324
325         final IoInputStream asyncOut = getMockedIoInputStream();
326         final IoOutputStream asyncIn = getMockedIoOutputStream();
327         final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
328         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
329         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
330
331         sshConnectListener.operationComplete(connectFuture);
332         sshAuthListener.operationComplete(getSuccessAuthFuture());
333         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
334
335         final ChannelPromise disconnectPromise = getMockedPromise();
336         asyncSshHandler.disconnect(ctx, disconnectPromise);
337
338         verify(sshSession).close(anyBoolean());
339         verify(disconnectPromise).setSuccess();
340         //verify(ctx).fireChannelInactive();
341     }
342
343     private static OpenFuture getSuccessOpenFuture() {
344         final OpenFuture openFuture = mock(OpenFuture.class);
345         doReturn(null).when(openFuture).getException();
346         return openFuture;
347     }
348
349     private static AuthFuture getSuccessAuthFuture() {
350         final AuthFuture authFuture = mock(AuthFuture.class);
351         doReturn(null).when(authFuture).getException();
352         return authFuture;
353     }
354
355     private static ConnectFuture getSuccessConnectFuture(final ClientSession sshSession) {
356         final ConnectFuture connectFuture = mock(ConnectFuture.class);
357         doReturn(null).when(connectFuture).getException();
358
359         doReturn(sshSession).when(connectFuture).getSession();
360         return connectFuture;
361     }
362
363     private static NettyAwareClientSession getMockedSshSession(final NettyAwareChannelSubsystem subsystemChannel)
364             throws IOException {
365         final NettyAwareClientSession sshSession = mock(NettyAwareClientSession.class);
366
367         doReturn("serverVersion").when(sshSession).getServerVersion();
368         doReturn(false).when(sshSession).isClosed();
369         doReturn(false).when(sshSession).isClosing();
370         final CloseFuture closeFuture = mock(CloseFuture.class);
371         Futures.addCallback(stubAddListener(closeFuture), new SuccessFutureListener<CloseFuture>() {
372             @Override
373             public void onSuccess(final SshFutureListener<CloseFuture> result) {
374                 doReturn(true).when(closeFuture).isClosed();
375                 result.operationComplete(closeFuture);
376             }
377         }, MoreExecutors.directExecutor());
378         doReturn(closeFuture).when(sshSession).close(false);
379
380         doReturn(subsystemChannel).when(sshSession).createSubsystemChannel(eq("netconf"), any());
381
382         return sshSession;
383     }
384
385     private NettyAwareChannelSubsystem getMockedSubsystemChannel(final IoInputStream asyncOut,
386                                                        final IoOutputStream asyncIn) throws IOException {
387         final NettyAwareChannelSubsystem subsystemChannel = mock(NettyAwareChannelSubsystem.class);
388
389         doNothing().when(subsystemChannel).setStreaming(any(ClientChannel.Streaming.class));
390         final OpenFuture openFuture = mock(OpenFuture.class);
391
392         Futures.addCallback(stubAddListener(openFuture), new SuccessFutureListener<OpenFuture>() {
393             @Override
394             public void onSuccess(final SshFutureListener<OpenFuture> result) {
395                 sshChannelOpenListener = result;
396             }
397         }, MoreExecutors.directExecutor());
398
399         doReturn(openFuture).when(subsystemChannel).open();
400         doReturn(asyncIn).when(subsystemChannel).getAsyncIn();
401         doNothing().when(subsystemChannel).onClose(any());
402         doNothing().when(subsystemChannel).close();
403         return subsystemChannel;
404     }
405
406     private static IoOutputStream getMockedIoOutputStream() throws IOException {
407         final IoOutputStream mock = mock(IoOutputStream.class);
408         final IoWriteFuture ioWriteFuture = mock(IoWriteFuture.class);
409         doReturn(null).when(ioWriteFuture).getException();
410
411         Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
412             @Override
413             public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
414                 result.operationComplete(ioWriteFuture);
415             }
416         }, MoreExecutors.directExecutor());
417
418         doReturn(ioWriteFuture).when(mock).writeBuffer(any(Buffer.class));
419         doReturn(false).when(mock).isClosed();
420         doReturn(false).when(mock).isClosing();
421         return mock;
422     }
423
424     private static IoInputStream getMockedIoInputStream() {
425         final IoInputStream mock = mock(IoInputStream.class);
426         final IoReadFuture ioReadFuture = mock(IoReadFuture.class);
427         // Always success for read
428         Futures.addCallback(stubAddListener(ioReadFuture), new SuccessFutureListener<IoReadFuture>() {
429             @Override
430             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
431                 result.operationComplete(ioReadFuture);
432             }
433         }, MoreExecutors.directExecutor());
434         return mock;
435     }
436
437     @Test
438     public void testConnectFailOpenChannel() throws Exception {
439         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
440
441         final IoInputStream asyncOut = getMockedIoInputStream();
442         final IoOutputStream asyncIn = getMockedIoOutputStream();
443         final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
444         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
445         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
446
447         sshConnectListener.operationComplete(connectFuture);
448
449         sshAuthListener.operationComplete(getSuccessAuthFuture());
450
451         verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
452
453         sshChannelOpenListener.operationComplete(getFailedOpenFuture());
454         verify(promise).setFailure(any(Throwable.class));
455     }
456
457     @Test
458     public void testConnectFailAuth() throws Exception {
459         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
460
461         final NettyAwareClientSession sshSession = mock(NettyAwareClientSession.class);
462         doReturn(true).when(sshSession).isClosed();
463         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
464
465         sshConnectListener.operationComplete(connectFuture);
466
467         final AuthFuture authFuture = getFailedAuthFuture();
468
469         sshAuthListener.operationComplete(authFuture);
470         verify(promise).setFailure(any(Throwable.class));
471         asyncSshHandler.close(ctx, getMockedPromise());
472         verify(ctx, times(0)).fireChannelInactive();
473     }
474
475     private static AuthFuture getFailedAuthFuture() {
476         final AuthFuture authFuture = mock(AuthFuture.class);
477         doReturn(new IllegalStateException()).when(authFuture).getException();
478         return authFuture;
479     }
480
481     private static OpenFuture getFailedOpenFuture() {
482         final OpenFuture openFuture = mock(OpenFuture.class);
483         doReturn(new IllegalStateException()).when(openFuture).getException();
484         return openFuture;
485     }
486
487     @Test
488     public void testConnectFail() throws Exception {
489         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
490
491         final ConnectFuture connectFuture = getFailedConnectFuture();
492         sshConnectListener.operationComplete(connectFuture);
493         verify(promise).setFailure(any(Throwable.class));
494     }
495
496     private static ConnectFuture getFailedConnectFuture() {
497         final ConnectFuture connectFuture = mock(ConnectFuture.class);
498         doReturn(new IllegalStateException()).when(connectFuture).getException();
499         return connectFuture;
500     }
501
502     private ChannelPromise getMockedPromise() {
503         return spy(new DefaultChannelPromise(channel));
504     }
505
506     private abstract static class SuccessFutureListener<T extends SshFuture<T>>
507             implements FutureCallback<SshFutureListener<T>> {
508
509         @Override
510         public abstract void onSuccess(SshFutureListener<T> result);
511
512         @Override
513         public void onFailure(final Throwable throwable) {
514             throw new RuntimeException(throwable);
515         }
516     }
517 }