Merge "Remove redundant code constructs"
[netconf.git] / netconf / netconf-netty-util / src / test / java / org / opendaylight / netconf / nettyutil / handler / ssh / client / AsyncSshHandlerTest.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.nettyutil.handler.ssh.client;
10
11 import static org.mockito.Matchers.any;
12 import static org.mockito.Matchers.anyBoolean;
13 import static org.mockito.Matchers.anyObject;
14 import static org.mockito.Matchers.anyString;
15 import static org.mockito.Mockito.doAnswer;
16 import static org.mockito.Mockito.doNothing;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.doThrow;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.spy;
21 import static org.mockito.Mockito.times;
22 import static org.mockito.Mockito.verify;
23 import static org.mockito.Mockito.verifyZeroInteractions;
24
25 import com.google.common.util.concurrent.FutureCallback;
26 import com.google.common.util.concurrent.Futures;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.MoreExecutors;
29 import com.google.common.util.concurrent.SettableFuture;
30 import io.netty.buffer.ByteBuf;
31 import io.netty.buffer.Unpooled;
32 import io.netty.channel.Channel;
33 import io.netty.channel.ChannelFuture;
34 import io.netty.channel.ChannelHandlerContext;
35 import io.netty.channel.ChannelPromise;
36 import io.netty.channel.DefaultChannelPromise;
37 import io.netty.channel.EventLoop;
38 import java.io.IOException;
39 import java.net.SocketAddress;
40 import org.apache.sshd.client.SshClient;
41 import org.apache.sshd.client.channel.ChannelSubsystem;
42 import org.apache.sshd.client.channel.ClientChannel;
43 import org.apache.sshd.client.future.AuthFuture;
44 import org.apache.sshd.client.future.ConnectFuture;
45 import org.apache.sshd.client.future.OpenFuture;
46 import org.apache.sshd.client.session.ClientSession;
47 import org.apache.sshd.common.future.CloseFuture;
48 import org.apache.sshd.common.future.SshFuture;
49 import org.apache.sshd.common.future.SshFutureListener;
50 import org.apache.sshd.common.io.IoInputStream;
51 import org.apache.sshd.common.io.IoOutputStream;
52 import org.apache.sshd.common.io.IoReadFuture;
53 import org.apache.sshd.common.io.IoWriteFuture;
54 import org.apache.sshd.common.util.buffer.Buffer;
55 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
56 import org.junit.After;
57 import org.junit.Before;
58 import org.junit.Ignore;
59 import org.junit.Test;
60 import org.mockito.Matchers;
61 import org.mockito.Mock;
62 import org.mockito.MockitoAnnotations;
63 import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
64
65 public class AsyncSshHandlerTest {
66
67     @Mock
68     private SshClient sshClient;
69     @Mock
70     private AuthenticationHandler authHandler;
71     @Mock
72     private ChannelHandlerContext ctx;
73     @Mock
74     private Channel channel;
75     @Mock
76     private SocketAddress remoteAddress;
77     @Mock
78     private SocketAddress localAddress;
79     @Mock
80     private EventLoop eventLoop;
81
82     private AsyncSshHandler asyncSshHandler;
83
84     private SshFutureListener<ConnectFuture> sshConnectListener;
85     private SshFutureListener<AuthFuture> sshAuthListener;
86     private SshFutureListener<OpenFuture> sshChannelOpenListener;
87
88     private ChannelPromise promise;
89
90     @Before
91     public void setUp() throws Exception {
92         MockitoAnnotations.initMocks(this);
93         stubAuth();
94         stubSshClient();
95         stubChannel();
96         stubEventLoop();
97         stubCtx();
98         stubRemoteAddress();
99
100         promise = getMockedPromise();
101
102         asyncSshHandler = new AsyncSshHandler(authHandler, sshClient);
103     }
104
105     @After
106     public void tearDown() throws Exception {
107         sshConnectListener = null;
108         sshAuthListener = null;
109         sshChannelOpenListener = null;
110         promise = null;
111         asyncSshHandler.close(ctx, getMockedPromise());
112     }
113
114     private void stubAuth() throws IOException {
115         doReturn("usr").when(authHandler).getUsername();
116
117         final AuthFuture authFuture = mock(AuthFuture.class);
118         Futures.addCallback(stubAddListener(authFuture), new SuccessFutureListener<AuthFuture>() {
119             @Override
120             public void onSuccess(final SshFutureListener<AuthFuture> result) {
121                 sshAuthListener = result;
122             }
123         }, MoreExecutors.directExecutor());
124         doReturn(authFuture).when(authHandler).authenticate(any(ClientSession.class));
125     }
126
127     @SuppressWarnings("unchecked")
128     private static <T extends SshFuture<T>> ListenableFuture<SshFutureListener<T>> stubAddListener(final T future) {
129         final SettableFuture<SshFutureListener<T>> listenerSettableFuture = SettableFuture.create();
130
131         doAnswer(invocation -> {
132             listenerSettableFuture.set((SshFutureListener<T>) invocation.getArguments()[0]);
133             return null;
134         }).when(future).addListener(any(SshFutureListener.class));
135
136         return listenerSettableFuture;
137     }
138
139     private void stubRemoteAddress() {
140         doReturn("remote").when(remoteAddress).toString();
141     }
142
143     private void stubCtx() {
144         doReturn(channel).when(ctx).channel();
145         doReturn(ctx).when(ctx).fireChannelActive();
146         doReturn(ctx).when(ctx).fireChannelInactive();
147         doReturn(ctx).when(ctx).fireChannelRead(anyObject());
148         doReturn(mock(ChannelFuture.class)).when(ctx).disconnect(any(ChannelPromise.class));
149         doReturn(getMockedPromise()).when(ctx).newPromise();
150     }
151
152     private void stubChannel() {
153         doReturn("channel").when(channel).toString();
154     }
155
156     private void stubEventLoop() {
157         doReturn(eventLoop).when(channel).eventLoop();
158         doReturn(Boolean.TRUE).when(eventLoop).inEventLoop();
159     }
160
161     private void stubSshClient() throws IOException {
162         doNothing().when(sshClient).start();
163         final ConnectFuture connectFuture = mock(ConnectFuture.class);
164         Futures.addCallback(stubAddListener(connectFuture), new SuccessFutureListener<ConnectFuture>() {
165             @Override
166             public void onSuccess(final SshFutureListener<ConnectFuture> result) {
167                 sshConnectListener = result;
168             }
169         }, MoreExecutors.directExecutor());
170         doReturn(connectFuture).when(sshClient).connect("usr", remoteAddress);
171     }
172
173     @Test
174     public void testConnectSuccess() throws Exception {
175         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
176
177         final IoInputStream asyncOut = getMockedIoInputStream();
178         final IoOutputStream asyncIn = getMockedIoOutputStream();
179         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
180         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
181         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
182
183         sshConnectListener.operationComplete(connectFuture);
184         sshAuthListener.operationComplete(getSuccessAuthFuture());
185         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
186
187         verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
188
189         verify(promise).setSuccess();
190         verify(ctx).fireChannelActive();
191     }
192
193     @Test
194     public void testRead() throws Exception {
195         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
196
197         final IoInputStream asyncOut = getMockedIoInputStream();
198         final IoOutputStream asyncIn = getMockedIoOutputStream();
199         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
200         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
201         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
202
203         sshConnectListener.operationComplete(connectFuture);
204         sshAuthListener.operationComplete(getSuccessAuthFuture());
205         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
206
207         verify(ctx).fireChannelRead(any(ByteBuf.class));
208     }
209
210     @Test
211     public void testReadClosed() throws Exception {
212         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
213
214         final IoInputStream asyncOut = getMockedIoInputStream();
215         final IoReadFuture mockedReadFuture = asyncOut.read(null);
216
217         Futures.addCallback(stubAddListener(mockedReadFuture), new SuccessFutureListener<IoReadFuture>() {
218             @Override
219             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
220                 doReturn(new IllegalStateException()).when(mockedReadFuture).getException();
221                 doReturn(mockedReadFuture).when(mockedReadFuture)
222                         .removeListener(Matchers.any());
223                 doReturn(true).when(asyncOut).isClosing();
224                 doReturn(true).when(asyncOut).isClosed();
225                 result.operationComplete(mockedReadFuture);
226             }
227         }, MoreExecutors.directExecutor());
228
229         final IoOutputStream asyncIn = getMockedIoOutputStream();
230         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
231         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
232         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
233
234         sshConnectListener.operationComplete(connectFuture);
235         sshAuthListener.operationComplete(getSuccessAuthFuture());
236         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
237
238         verify(ctx).fireChannelInactive();
239     }
240
241     @Test
242     public void testReadFail() throws Exception {
243         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
244
245         final IoInputStream asyncOut = getMockedIoInputStream();
246         final IoReadFuture mockedReadFuture = asyncOut.read(null);
247
248         Futures.addCallback(stubAddListener(mockedReadFuture), new SuccessFutureListener<IoReadFuture>() {
249             @Override
250             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
251                 doReturn(new IllegalStateException()).when(mockedReadFuture).getException();
252                 doReturn(mockedReadFuture).when(mockedReadFuture)
253                         .removeListener(Matchers.any());
254                 result.operationComplete(mockedReadFuture);
255             }
256         }, MoreExecutors.directExecutor());
257
258         final IoOutputStream asyncIn = getMockedIoOutputStream();
259         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
260         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
261         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
262
263         sshConnectListener.operationComplete(connectFuture);
264         sshAuthListener.operationComplete(getSuccessAuthFuture());
265         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
266
267         verify(ctx).fireChannelInactive();
268     }
269
270     @Test
271     public void testWrite() throws Exception {
272         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
273
274         final IoInputStream asyncOut = getMockedIoInputStream();
275         final IoOutputStream asyncIn = getMockedIoOutputStream();
276         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
277         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
278         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
279
280         sshConnectListener.operationComplete(connectFuture);
281         sshAuthListener.operationComplete(getSuccessAuthFuture());
282         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
283
284         final ChannelPromise writePromise = getMockedPromise();
285         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), writePromise);
286
287         verify(writePromise).setSuccess();
288     }
289
290     @Test
291     public void testWriteClosed() throws Exception {
292         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
293
294         final IoInputStream asyncOut = getMockedIoInputStream();
295         final IoOutputStream asyncIn = getMockedIoOutputStream();
296
297         final IoWriteFuture ioWriteFuture = asyncIn.write(null);
298
299         Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
300             @Override
301             public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
302                 doReturn(false).when(ioWriteFuture).isWritten();
303                 doReturn(new IllegalStateException()).when(ioWriteFuture).getException();
304                 doReturn(true).when(asyncIn).isClosing();
305                 doReturn(true).when(asyncIn).isClosed();
306                 result.operationComplete(ioWriteFuture);
307             }
308         }, MoreExecutors.directExecutor());
309
310         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
311         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
312         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
313
314         sshConnectListener.operationComplete(connectFuture);
315         sshAuthListener.operationComplete(getSuccessAuthFuture());
316         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
317
318         final ChannelPromise writePromise = getMockedPromise();
319         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), writePromise);
320
321         verify(writePromise).setFailure(any(Throwable.class));
322     }
323
324     @Test
325     public void testWritePendingOne() throws Exception {
326         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
327
328         final IoInputStream asyncOut = getMockedIoInputStream();
329         final IoOutputStream asyncIn = getMockedIoOutputStream();
330         final IoWriteFuture ioWriteFuture = asyncIn.write(null);
331
332         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
333         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
334         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
335
336         sshConnectListener.operationComplete(connectFuture);
337         sshAuthListener.operationComplete(getSuccessAuthFuture());
338         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
339
340         final ChannelPromise firstWritePromise = getMockedPromise();
341
342         // intercept listener for first write,
343         // so we can invoke successful write later thus simulate pending of the first write
344         final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture =
345                 stubAddListener(ioWriteFuture);
346         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
347         final SshFutureListener<IoWriteFuture> firstWriteListener = firstWriteListenerFuture.get();
348         // intercept second listener,
349         // this is the listener for pending write for the pending write to know when pending state ended
350         final ListenableFuture<SshFutureListener<IoWriteFuture>> pendingListener = stubAddListener(ioWriteFuture);
351
352         final ChannelPromise secondWritePromise = getMockedPromise();
353         // now make write throw pending exception
354         doThrow(org.apache.sshd.common.io.WritePendingException.class).when(asyncIn).write(any(Buffer.class));
355         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
356
357         doReturn(ioWriteFuture).when(asyncIn).write(any(Buffer.class));
358
359         verifyZeroInteractions(firstWritePromise, secondWritePromise);
360
361         // make first write stop pending
362         firstWriteListener.operationComplete(ioWriteFuture);
363
364         // notify listener for second write that pending has ended
365         pendingListener.get().operationComplete(ioWriteFuture);
366
367         // verify both write promises successful
368         verify(firstWritePromise).setSuccess();
369         verify(secondWritePromise).setSuccess();
370     }
371
372     @Ignore("Pending queue is not limited")
373     @Test
374     public void testWritePendingMax() throws Exception {
375         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
376
377         final IoInputStream asyncOut = getMockedIoInputStream();
378         final IoOutputStream asyncIn = getMockedIoOutputStream();
379         final IoWriteFuture ioWriteFuture = asyncIn.write(null);
380
381         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
382         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
383         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
384
385         sshConnectListener.operationComplete(connectFuture);
386         sshAuthListener.operationComplete(getSuccessAuthFuture());
387         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
388
389         final ChannelPromise firstWritePromise = getMockedPromise();
390
391         // intercept listener for first write,
392         // so we can invoke successful write later thus simulate pending of the first write
393         final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture =
394                 stubAddListener(ioWriteFuture);
395         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
396
397         final ChannelPromise secondWritePromise = getMockedPromise();
398         // now make write throw pending exception
399         doThrow(org.apache.sshd.common.io.WritePendingException.class).when(asyncIn).write(any(Buffer.class));
400         for (int i = 0; i < 1001; i++) {
401             asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
402         }
403
404         verify(secondWritePromise, times(1)).setFailure(any(Throwable.class));
405     }
406
407     @Test
408     public void testDisconnect() throws Exception {
409         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
410
411         final IoInputStream asyncOut = getMockedIoInputStream();
412         final IoOutputStream asyncIn = getMockedIoOutputStream();
413         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
414         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
415         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
416
417         sshConnectListener.operationComplete(connectFuture);
418         sshAuthListener.operationComplete(getSuccessAuthFuture());
419         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
420
421         final ChannelPromise disconnectPromise = getMockedPromise();
422         asyncSshHandler.disconnect(ctx, disconnectPromise);
423
424         verify(sshSession).close(anyBoolean());
425         verify(disconnectPromise).setSuccess();
426         verify(ctx).fireChannelInactive();
427     }
428
429     private static OpenFuture getSuccessOpenFuture() {
430         final OpenFuture failedOpenFuture = mock(OpenFuture.class);
431         doReturn(true).when(failedOpenFuture).isOpened();
432         return failedOpenFuture;
433     }
434
435     private static AuthFuture getSuccessAuthFuture() {
436         final AuthFuture authFuture = mock(AuthFuture.class);
437         doReturn(true).when(authFuture).isSuccess();
438         return authFuture;
439     }
440
441     private static ConnectFuture getSuccessConnectFuture(final ClientSession sshSession) {
442         final ConnectFuture connectFuture = mock(ConnectFuture.class);
443         doReturn(true).when(connectFuture).isConnected();
444
445         doReturn(sshSession).when(connectFuture).getSession();
446         return connectFuture;
447     }
448
449     private static ClientSession getMockedSshSession(final ChannelSubsystem subsystemChannel) throws IOException {
450         final ClientSession sshSession = mock(ClientSession.class);
451
452         doReturn("sshSession").when(sshSession).toString();
453         doReturn("serverVersion").when(sshSession).getServerVersion();
454         doReturn(false).when(sshSession).isClosed();
455         doReturn(false).when(sshSession).isClosing();
456         final CloseFuture closeFuture = mock(CloseFuture.class);
457         Futures.addCallback(stubAddListener(closeFuture), new SuccessFutureListener<CloseFuture>() {
458             @Override
459             public void onSuccess(final SshFutureListener<CloseFuture> result) {
460                 doReturn(true).when(closeFuture).isClosed();
461                 result.operationComplete(closeFuture);
462             }
463         }, MoreExecutors.directExecutor());
464         doReturn(closeFuture).when(sshSession).close(false);
465
466         doReturn(subsystemChannel).when(sshSession).createSubsystemChannel(anyString());
467
468         return sshSession;
469     }
470
471     private ChannelSubsystem getMockedSubsystemChannel(final IoInputStream asyncOut,
472                                                        final IoOutputStream asyncIn) throws IOException {
473         final ChannelSubsystem subsystemChannel = mock(ChannelSubsystem.class);
474         doReturn("subsystemChannel").when(subsystemChannel).toString();
475
476         doNothing().when(subsystemChannel).setStreaming(any(ClientChannel.Streaming.class));
477         final OpenFuture openFuture = mock(OpenFuture.class);
478
479         Futures.addCallback(stubAddListener(openFuture), new SuccessFutureListener<OpenFuture>() {
480             @Override
481             public void onSuccess(final SshFutureListener<OpenFuture> result) {
482                 sshChannelOpenListener = result;
483             }
484         }, MoreExecutors.directExecutor());
485
486         doReturn(asyncOut).when(subsystemChannel).getAsyncOut();
487
488         doReturn(openFuture).when(subsystemChannel).open();
489         doReturn(asyncIn).when(subsystemChannel).getAsyncIn();
490         return subsystemChannel;
491     }
492
493     private static IoOutputStream getMockedIoOutputStream() {
494         final IoOutputStream mock = mock(IoOutputStream.class);
495         final IoWriteFuture ioWriteFuture = mock(IoWriteFuture.class);
496         doReturn(ioWriteFuture).when(ioWriteFuture).addListener(Matchers.any());
497         doReturn(true).when(ioWriteFuture).isWritten();
498
499         Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
500             @Override
501             public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
502                 result.operationComplete(ioWriteFuture);
503             }
504         }, MoreExecutors.directExecutor());
505
506         doReturn(ioWriteFuture).when(mock).write(any(Buffer.class));
507         doReturn(false).when(mock).isClosed();
508         doReturn(false).when(mock).isClosing();
509         return mock;
510     }
511
512     private static IoInputStream getMockedIoInputStream() {
513         final IoInputStream mock = mock(IoInputStream.class);
514         final IoReadFuture ioReadFuture = mock(IoReadFuture.class);
515         doReturn(null).when(ioReadFuture).getException();
516         doReturn(ioReadFuture).when(ioReadFuture).removeListener(Matchers.any());
517         doReturn(5).when(ioReadFuture).getRead();
518         doReturn(new ByteArrayBuffer(new byte[]{0, 1, 2, 3, 4})).when(ioReadFuture).getBuffer();
519         doReturn(ioReadFuture).when(ioReadFuture).addListener(Matchers.any());
520
521         // Always success for read
522         Futures.addCallback(stubAddListener(ioReadFuture), new SuccessFutureListener<IoReadFuture>() {
523             @Override
524             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
525                 result.operationComplete(ioReadFuture);
526             }
527         }, MoreExecutors.directExecutor());
528
529         doReturn(ioReadFuture).when(mock).read(any(Buffer.class));
530         doReturn(false).when(mock).isClosed();
531         doReturn(false).when(mock).isClosing();
532         return mock;
533     }
534
535     @Test
536     public void testConnectFailOpenChannel() throws Exception {
537         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
538
539         final IoInputStream asyncOut = getMockedIoInputStream();
540         final IoOutputStream asyncIn = getMockedIoOutputStream();
541         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
542         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
543         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
544
545         sshConnectListener.operationComplete(connectFuture);
546
547         sshAuthListener.operationComplete(getSuccessAuthFuture());
548
549         verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
550
551         sshChannelOpenListener.operationComplete(getFailedOpenFuture());
552         verify(promise).setFailure(any(Throwable.class));
553     }
554
555     @Test
556     public void testConnectFailAuth() throws Exception {
557         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
558
559         final ClientSession sshSession = mock(ClientSession.class);
560         doReturn(true).when(sshSession).isClosed();
561         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
562
563         sshConnectListener.operationComplete(connectFuture);
564
565         final AuthFuture authFuture = getFailedAuthFuture();
566
567         sshAuthListener.operationComplete(authFuture);
568         verify(promise).setFailure(any(Throwable.class));
569     }
570
571     private static AuthFuture getFailedAuthFuture() {
572         final AuthFuture authFuture = mock(AuthFuture.class);
573         doReturn(false).when(authFuture).isSuccess();
574         doReturn(new IllegalStateException()).when(authFuture).getException();
575         return authFuture;
576     }
577
578     private static OpenFuture getFailedOpenFuture() {
579         final OpenFuture authFuture = mock(OpenFuture.class);
580         doReturn(false).when(authFuture).isOpened();
581         doReturn(new IllegalStateException()).when(authFuture).getException();
582         return authFuture;
583     }
584
585     @Test
586     public void testConnectFail() throws Exception {
587         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
588
589         final ConnectFuture connectFuture = getFailedConnectFuture();
590         sshConnectListener.operationComplete(connectFuture);
591         verify(promise).setFailure(any(Throwable.class));
592     }
593
594     private static ConnectFuture getFailedConnectFuture() {
595         final ConnectFuture connectFuture = mock(ConnectFuture.class);
596         doReturn(false).when(connectFuture).isConnected();
597         doReturn(new IllegalStateException()).when(connectFuture).getException();
598         return connectFuture;
599     }
600
601     private ChannelPromise getMockedPromise() {
602         return spy(new DefaultChannelPromise(channel));
603     }
604
605     private abstract static class SuccessFutureListener<T extends SshFuture<T>>
606             implements FutureCallback<SshFutureListener<T>> {
607
608         @Override
609         public abstract void onSuccess(SshFutureListener<T> result);
610
611         @Override
612         public void onFailure(final Throwable throwable) {
613             throw new RuntimeException(throwable);
614         }
615     }
616 }