BUG-2340 Fix improper cleanup of resources in netconf ssh handler
[controller.git] / opendaylight / netconf / netconf-netty-util / src / test / java / org / opendaylight / controller / netconf / nettyutil / handler / ssh / client / AsyncSshHandlerTest.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
10
11 import static org.mockito.Matchers.any;
12 import static org.mockito.Matchers.anyBoolean;
13 import static org.mockito.Matchers.anyObject;
14 import static org.mockito.Matchers.anyString;
15 import static org.mockito.Mockito.doAnswer;
16 import static org.mockito.Mockito.doNothing;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.doThrow;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.spy;
21 import static org.mockito.Mockito.times;
22 import static org.mockito.Mockito.verify;
23 import static org.mockito.Mockito.verifyZeroInteractions;
24
25 import com.google.common.util.concurrent.FutureCallback;
26 import com.google.common.util.concurrent.Futures;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.SettableFuture;
29 import io.netty.buffer.ByteBuf;
30 import io.netty.buffer.Unpooled;
31 import io.netty.channel.Channel;
32 import io.netty.channel.ChannelFuture;
33 import io.netty.channel.ChannelHandlerContext;
34 import io.netty.channel.ChannelPromise;
35 import io.netty.channel.DefaultChannelPromise;
36 import java.io.IOException;
37 import java.net.SocketAddress;
38 import org.apache.sshd.ClientChannel;
39 import org.apache.sshd.ClientSession;
40 import org.apache.sshd.SshClient;
41 import org.apache.sshd.client.channel.ChannelSubsystem;
42 import org.apache.sshd.client.future.AuthFuture;
43 import org.apache.sshd.client.future.ConnectFuture;
44 import org.apache.sshd.client.future.OpenFuture;
45 import org.apache.sshd.common.future.CloseFuture;
46 import org.apache.sshd.common.future.SshFuture;
47 import org.apache.sshd.common.future.SshFutureListener;
48 import org.apache.sshd.common.io.IoInputStream;
49 import org.apache.sshd.common.io.IoOutputStream;
50 import org.apache.sshd.common.io.IoReadFuture;
51 import org.apache.sshd.common.io.IoWriteFuture;
52 import org.apache.sshd.common.util.Buffer;
53 import org.junit.After;
54 import org.junit.Before;
55 import org.junit.Ignore;
56 import org.junit.Test;
57 import org.mockito.Matchers;
58 import org.mockito.Mock;
59 import org.mockito.MockitoAnnotations;
60 import org.mockito.invocation.InvocationOnMock;
61 import org.mockito.stubbing.Answer;
62 import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
63
64 public class AsyncSshHandlerTest {
65
66     @Mock
67     private SshClient sshClient;
68     @Mock
69     private AuthenticationHandler authHandler;
70     @Mock
71     private ChannelHandlerContext ctx;
72     @Mock
73     private Channel channel;
74     @Mock
75     private SocketAddress remoteAddress;
76     @Mock
77     private SocketAddress localAddress;
78
79     private AsyncSshHandler asyncSshHandler;
80
81     private SshFutureListener<ConnectFuture> sshConnectListener;
82     private SshFutureListener<AuthFuture> sshAuthListener;
83     private SshFutureListener<OpenFuture> sshChannelOpenListener;
84
85     private ChannelPromise promise;
86
87     @Before
88     public void setUp() throws Exception {
89         MockitoAnnotations.initMocks(this);
90         stubAuth();
91         stubSshClient();
92         stubChannel();
93         stubCtx();
94         stubRemoteAddress();
95
96         promise = getMockedPromise();
97
98         asyncSshHandler = new AsyncSshHandler(authHandler, sshClient);
99     }
100
101     @After
102     public void tearDown() throws Exception {
103         sshConnectListener = null;
104         sshAuthListener = null;
105         sshChannelOpenListener = null;
106         promise = null;
107         asyncSshHandler.close(ctx, getMockedPromise());
108     }
109
110     private void stubAuth() throws IOException {
111         doReturn("usr").when(authHandler).getUsername();
112
113         final AuthFuture authFuture = mock(AuthFuture.class);
114         Futures.addCallback(stubAddListener(authFuture), new SuccessFutureListener<AuthFuture>() {
115             @Override
116             public void onSuccess(final SshFutureListener<AuthFuture> result) {
117                 sshAuthListener = result;
118             }
119         });
120         doReturn(authFuture).when(authHandler).authenticate(any(ClientSession.class));
121     }
122
123     @SuppressWarnings("unchecked")
124     private <T extends SshFuture<T>> ListenableFuture<SshFutureListener<T>> stubAddListener(final T future) {
125         final SettableFuture<SshFutureListener<T>> listenerSettableFuture = SettableFuture.create();
126
127         doAnswer(new Answer<Object>() {
128             @Override
129             public Object answer(final InvocationOnMock invocation) throws Throwable {
130                 listenerSettableFuture.set((SshFutureListener<T>) invocation.getArguments()[0]);
131                 return null;
132             }
133         }).when(future).addListener(any(SshFutureListener.class));
134
135         return listenerSettableFuture;
136     }
137
138     private void stubRemoteAddress() {
139         doReturn("remote").when(remoteAddress).toString();
140     }
141
142     private void stubCtx() {
143         doReturn(channel).when(ctx).channel();
144         doReturn(ctx).when(ctx).fireChannelActive();
145         doReturn(ctx).when(ctx).fireChannelInactive();
146         doReturn(ctx).when(ctx).fireChannelRead(anyObject());
147         doReturn(mock(ChannelFuture.class)).when(ctx).disconnect(any(ChannelPromise.class));
148         doReturn(getMockedPromise()).when(ctx).newPromise();
149     }
150
151     private void stubChannel() {
152         doReturn("channel").when(channel).toString();
153     }
154
155     private void stubSshClient() {
156         doNothing().when(sshClient).start();
157         final ConnectFuture connectFuture = mock(ConnectFuture.class);
158         Futures.addCallback(stubAddListener(connectFuture), new SuccessFutureListener<ConnectFuture>() {
159             @Override
160             public void onSuccess(final SshFutureListener<ConnectFuture> result) {
161                 sshConnectListener = result;
162             }
163         });
164         doReturn(connectFuture).when(sshClient).connect("usr", remoteAddress);
165     }
166
167     @Test
168     public void testConnectSuccess() throws Exception {
169         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
170
171         final IoInputStream asyncOut = getMockedIoInputStream();
172         final IoOutputStream asyncIn = getMockedIoOutputStream();
173         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
174         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
175         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
176
177         sshConnectListener.operationComplete(connectFuture);
178         sshAuthListener.operationComplete(getSuccessAuthFuture());
179         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
180
181         verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
182
183         verify(promise).setSuccess();
184         verify(ctx).fireChannelActive();
185     }
186
187     @Test
188     public void testRead() throws Exception {
189         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
190
191         final IoInputStream asyncOut = getMockedIoInputStream();
192         final IoOutputStream asyncIn = getMockedIoOutputStream();
193         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
194         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
195         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
196
197         sshConnectListener.operationComplete(connectFuture);
198         sshAuthListener.operationComplete(getSuccessAuthFuture());
199         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
200
201         verify(ctx).fireChannelRead(any(ByteBuf.class));
202     }
203
204     @Test
205     public void testReadClosed() throws Exception {
206         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
207
208         final IoInputStream asyncOut = getMockedIoInputStream();
209         final IoReadFuture mockedReadFuture = asyncOut.read(null);
210
211         Futures.addCallback(stubAddListener(mockedReadFuture), new SuccessFutureListener<IoReadFuture>() {
212             @Override
213             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
214                 doReturn(new IllegalStateException()).when(mockedReadFuture).getException();
215                 doReturn(mockedReadFuture).when(mockedReadFuture).removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
216                 doReturn(true).when(asyncOut).isClosing();
217                 doReturn(true).when(asyncOut).isClosed();
218                 result.operationComplete(mockedReadFuture);
219             }
220         });
221
222         final IoOutputStream asyncIn = getMockedIoOutputStream();
223         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
224         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
225         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
226
227         sshConnectListener.operationComplete(connectFuture);
228         sshAuthListener.operationComplete(getSuccessAuthFuture());
229         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
230
231         verify(ctx).fireChannelInactive();
232     }
233
234     @Test
235     public void testReadFail() throws Exception {
236         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
237
238         final IoInputStream asyncOut = getMockedIoInputStream();
239         final IoReadFuture mockedReadFuture = asyncOut.read(null);
240
241         Futures.addCallback(stubAddListener(mockedReadFuture), new SuccessFutureListener<IoReadFuture>() {
242             @Override
243             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
244                 doReturn(new IllegalStateException()).when(mockedReadFuture).getException();
245                 doReturn(mockedReadFuture).when(mockedReadFuture).removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
246                 result.operationComplete(mockedReadFuture);
247             }
248         });
249
250         final IoOutputStream asyncIn = getMockedIoOutputStream();
251         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
252         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
253         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
254
255         sshConnectListener.operationComplete(connectFuture);
256         sshAuthListener.operationComplete(getSuccessAuthFuture());
257         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
258
259         verify(ctx).fireChannelInactive();
260     }
261
262     @Test
263     public void testWrite() throws Exception {
264         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
265
266         final IoInputStream asyncOut = getMockedIoInputStream();
267         final IoOutputStream asyncIn = getMockedIoOutputStream();
268         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
269         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
270         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
271
272         sshConnectListener.operationComplete(connectFuture);
273         sshAuthListener.operationComplete(getSuccessAuthFuture());
274         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
275
276         final ChannelPromise writePromise = getMockedPromise();
277         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), writePromise);
278
279         verify(writePromise).setSuccess();
280     }
281
282     @Test
283     public void testWriteClosed() throws Exception {
284         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
285
286         final IoInputStream asyncOut = getMockedIoInputStream();
287         final IoOutputStream asyncIn = getMockedIoOutputStream();
288
289         final IoWriteFuture ioWriteFuture = asyncIn.write(null);
290
291         Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
292             @Override
293             public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
294                 doReturn(false).when(ioWriteFuture).isWritten();
295                 doReturn(new IllegalStateException()).when(ioWriteFuture).getException();
296                 doReturn(true).when(asyncIn).isClosing();
297                 doReturn(true).when(asyncIn).isClosed();
298                 result.operationComplete(ioWriteFuture);
299             }
300         });
301
302         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
303         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
304         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
305
306         sshConnectListener.operationComplete(connectFuture);
307         sshAuthListener.operationComplete(getSuccessAuthFuture());
308         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
309
310         final ChannelPromise writePromise = getMockedPromise();
311         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), writePromise);
312
313         verify(writePromise).setFailure(any(Throwable.class));
314     }
315
316     @Test
317     public void testWritePendingOne() throws Exception {
318         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
319
320         final IoInputStream asyncOut = getMockedIoInputStream();
321         final IoOutputStream asyncIn = getMockedIoOutputStream();
322         final IoWriteFuture ioWriteFuture = asyncIn.write(null);
323
324         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
325         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
326         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
327
328         sshConnectListener.operationComplete(connectFuture);
329         sshAuthListener.operationComplete(getSuccessAuthFuture());
330         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
331
332         final ChannelPromise firstWritePromise = getMockedPromise();
333
334         // intercept listener for first write, so we can invoke successful write later thus simulate pending of the first write
335         final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture = stubAddListener(ioWriteFuture);
336         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
337         final SshFutureListener<IoWriteFuture> firstWriteListener = firstWriteListenerFuture.get();
338         // intercept second listener, this is the listener for pending write for the pending write to know when pending state ended
339         final ListenableFuture<SshFutureListener<IoWriteFuture>> pendingListener = stubAddListener(ioWriteFuture);
340
341         final ChannelPromise secondWritePromise = getMockedPromise();
342         // now make write throw pending exception
343         doThrow(org.apache.sshd.common.io.WritePendingException.class).when(asyncIn).write(any(Buffer.class));
344         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
345
346         doReturn(ioWriteFuture).when(asyncIn).write(any(Buffer.class));
347
348         verifyZeroInteractions(firstWritePromise, secondWritePromise);
349
350         // make first write stop pending
351         firstWriteListener.operationComplete(ioWriteFuture);
352
353         // notify listener for second write that pending has ended
354         pendingListener.get().operationComplete(ioWriteFuture);
355
356         // verify both write promises successful
357         verify(firstWritePromise).setSuccess();
358         verify(secondWritePromise).setSuccess();
359     }
360
361     @Ignore("Pending queue is not limited")
362     @Test
363     public void testWritePendingMax() throws Exception {
364         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
365
366         final IoInputStream asyncOut = getMockedIoInputStream();
367         final IoOutputStream asyncIn = getMockedIoOutputStream();
368         final IoWriteFuture ioWriteFuture = asyncIn.write(null);
369
370         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
371         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
372         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
373
374         sshConnectListener.operationComplete(connectFuture);
375         sshAuthListener.operationComplete(getSuccessAuthFuture());
376         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
377
378         final ChannelPromise firstWritePromise = getMockedPromise();
379
380         // intercept listener for first write, so we can invoke successful write later thus simulate pending of the first write
381         final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture = stubAddListener(ioWriteFuture);
382         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
383
384         final ChannelPromise secondWritePromise = getMockedPromise();
385         // now make write throw pending exception
386         doThrow(org.apache.sshd.common.io.WritePendingException.class).when(asyncIn).write(any(Buffer.class));
387         for (int i = 0; i < 1001; i++) {
388             asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
389         }
390
391         verify(secondWritePromise, times(1)).setFailure(any(Throwable.class));
392     }
393
394     @Test
395     public void testDisconnect() throws Exception {
396         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
397
398         final IoInputStream asyncOut = getMockedIoInputStream();
399         final IoOutputStream asyncIn = getMockedIoOutputStream();
400         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
401         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
402         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
403
404         sshConnectListener.operationComplete(connectFuture);
405         sshAuthListener.operationComplete(getSuccessAuthFuture());
406         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
407
408         final ChannelPromise disconnectPromise = getMockedPromise();
409         asyncSshHandler.disconnect(ctx, disconnectPromise);
410
411         verify(sshSession).close(anyBoolean());
412         verify(disconnectPromise).setSuccess();
413         verify(ctx).fireChannelInactive();
414     }
415
416     private OpenFuture getSuccessOpenFuture() {
417         final OpenFuture failedOpenFuture = mock(OpenFuture.class);
418         doReturn(true).when(failedOpenFuture).isOpened();
419         return failedOpenFuture;
420     }
421
422     private AuthFuture getSuccessAuthFuture() {
423         final AuthFuture authFuture = mock(AuthFuture.class);
424         doReturn(true).when(authFuture).isSuccess();
425         return authFuture;
426     }
427
428     private ConnectFuture getSuccessConnectFuture(final ClientSession sshSession) {
429         final ConnectFuture connectFuture = mock(ConnectFuture.class);
430         doReturn(true).when(connectFuture).isConnected();
431
432         doReturn(sshSession).when(connectFuture).getSession();
433         return connectFuture;
434     }
435
436     private ClientSession getMockedSshSession(final ChannelSubsystem subsystemChannel) throws IOException {
437         final ClientSession sshSession = mock(ClientSession.class);
438
439         doReturn("sshSession").when(sshSession).toString();
440         doReturn("serverVersion").when(sshSession).getServerVersion();
441         doReturn(false).when(sshSession).isClosed();
442         doReturn(false).when(sshSession).isClosing();
443         final CloseFuture closeFuture = mock(CloseFuture.class);
444         Futures.addCallback(stubAddListener(closeFuture), new SuccessFutureListener<CloseFuture>() {
445             @Override
446             public void onSuccess(final SshFutureListener<CloseFuture> result) {
447                 doReturn(true).when(closeFuture).isClosed();
448                 result.operationComplete(closeFuture);
449             }
450         });
451         doReturn(closeFuture).when(sshSession).close(false);
452
453         doReturn(subsystemChannel).when(sshSession).createSubsystemChannel(anyString());
454
455         return sshSession;
456     }
457
458     private ChannelSubsystem getMockedSubsystemChannel(final IoInputStream asyncOut, final IoOutputStream asyncIn) throws IOException {
459         final ChannelSubsystem subsystemChannel = mock(ChannelSubsystem.class);
460         doReturn("subsystemChannel").when(subsystemChannel).toString();
461
462         doNothing().when(subsystemChannel).setStreaming(any(ClientChannel.Streaming.class));
463         final OpenFuture openFuture = mock(OpenFuture.class);
464
465         Futures.addCallback(stubAddListener(openFuture), new SuccessFutureListener<OpenFuture>() {
466             @Override
467             public void onSuccess(final SshFutureListener<OpenFuture> result) {
468                 sshChannelOpenListener = result;
469             }
470         });
471
472         doReturn(asyncOut).when(subsystemChannel).getAsyncOut();
473
474         doReturn(openFuture).when(subsystemChannel).open();
475         doReturn(asyncIn).when(subsystemChannel).getAsyncIn();
476         return subsystemChannel;
477     }
478
479     private IoOutputStream getMockedIoOutputStream() {
480         final IoOutputStream mock = mock(IoOutputStream.class);
481         final IoWriteFuture ioWriteFuture = mock(IoWriteFuture.class);
482         doReturn(ioWriteFuture).when(ioWriteFuture).addListener(Matchers.<SshFutureListener<IoWriteFuture>>any());
483         doReturn(true).when(ioWriteFuture).isWritten();
484
485         Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
486             @Override
487             public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
488                 result.operationComplete(ioWriteFuture);
489             }
490         });
491
492         doReturn(ioWriteFuture).when(mock).write(any(Buffer.class));
493         doReturn(false).when(mock).isClosed();
494         doReturn(false).when(mock).isClosing();
495         return mock;
496     }
497
498     private IoInputStream getMockedIoInputStream() {
499         final IoInputStream mock = mock(IoInputStream.class);
500         final IoReadFuture ioReadFuture = mock(IoReadFuture.class);
501         doReturn(null).when(ioReadFuture).getException();
502         doReturn(ioReadFuture).when(ioReadFuture).removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
503         doReturn(5).when(ioReadFuture).getRead();
504         doReturn(new Buffer(new byte[]{0, 1, 2, 3, 4})).when(ioReadFuture).getBuffer();
505         doReturn(ioReadFuture).when(ioReadFuture).addListener(Matchers.<SshFutureListener<IoReadFuture>>any());
506
507         // Always success for read
508         Futures.addCallback(stubAddListener(ioReadFuture), new SuccessFutureListener<IoReadFuture>() {
509             @Override
510             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
511                 result.operationComplete(ioReadFuture);
512             }
513         });
514
515         doReturn(ioReadFuture).when(mock).read(any(Buffer.class));
516         doReturn(false).when(mock).isClosed();
517         doReturn(false).when(mock).isClosing();
518         return mock;
519     }
520
521     @Test
522     public void testConnectFailOpenChannel() throws Exception {
523         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
524
525         final IoInputStream asyncOut = getMockedIoInputStream();
526         final IoOutputStream asyncIn = getMockedIoOutputStream();
527         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
528         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
529         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
530
531         sshConnectListener.operationComplete(connectFuture);
532
533         sshAuthListener.operationComplete(getSuccessAuthFuture());
534
535         verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
536
537         sshChannelOpenListener.operationComplete(getFailedOpenFuture());
538         verify(promise).setFailure(any(Throwable.class));
539     }
540
541     @Test
542     public void testConnectFailAuth() throws Exception {
543         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
544
545         final ClientSession sshSession = mock(ClientSession.class);
546         doReturn(true).when(sshSession).isClosed();
547         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
548
549         sshConnectListener.operationComplete(connectFuture);
550
551         final AuthFuture authFuture = getFailedAuthFuture();
552
553         sshAuthListener.operationComplete(authFuture);
554         verify(promise).setFailure(any(Throwable.class));
555     }
556
557     private AuthFuture getFailedAuthFuture() {
558         final AuthFuture authFuture = mock(AuthFuture.class);
559         doReturn(false).when(authFuture).isSuccess();
560         doReturn(new IllegalStateException()).when(authFuture).getException();
561         return authFuture;
562     }
563
564     private OpenFuture getFailedOpenFuture() {
565         final OpenFuture authFuture = mock(OpenFuture.class);
566         doReturn(false).when(authFuture).isOpened();
567         doReturn(new IllegalStateException()).when(authFuture).getException();
568         return authFuture;
569     }
570
571     @Test
572     public void testConnectFail() throws Exception {
573         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
574
575         final ConnectFuture connectFuture = getFailedConnectFuture();
576         sshConnectListener.operationComplete(connectFuture);
577         verify(promise).setFailure(any(Throwable.class));
578     }
579
580     private ConnectFuture getFailedConnectFuture() {
581         final ConnectFuture connectFuture = mock(ConnectFuture.class);
582         doReturn(false).when(connectFuture).isConnected();
583         doReturn(new IllegalStateException()).when(connectFuture).getException();
584         return connectFuture;
585     }
586
587     private ChannelPromise getMockedPromise() {
588         return spy(new DefaultChannelPromise(channel));
589     }
590
591     private static abstract class SuccessFutureListener<T extends SshFuture<T>> implements FutureCallback<SshFutureListener<T>> {
592
593         @Override
594         public abstract void onSuccess(final SshFutureListener<T> result);
595
596         @Override
597         public void onFailure(final Throwable t) {
598             throw new RuntimeException(t);
599         }
600     }
601 }