BUG-1521 Netconf-netty-util missing unit tests
[controller.git] / opendaylight / netconf / netconf-netty-util / src / test / java / org / opendaylight / controller / netconf / nettyutil / handler / ssh / client / AsyncSshHandlerTest.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
10
11 import static org.junit.Assert.fail;
12 import static org.mockito.Matchers.any;
13 import static org.mockito.Matchers.anyBoolean;
14 import static org.mockito.Matchers.anyObject;
15 import static org.mockito.Matchers.anyString;
16 import static org.mockito.Mockito.doAnswer;
17 import static org.mockito.Mockito.doNothing;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.doThrow;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.times;
22 import static org.mockito.Mockito.verify;
23 import static org.mockito.Mockito.verifyNoMoreInteractions;
24 import static org.mockito.Mockito.verifyZeroInteractions;
25
26 import io.netty.buffer.ByteBuf;
27 import io.netty.buffer.Unpooled;
28 import java.io.IOException;
29 import java.net.SocketAddress;
30
31 import java.nio.channels.WritePendingException;
32 import org.apache.sshd.ClientChannel;
33 import org.apache.sshd.ClientSession;
34 import org.apache.sshd.SshClient;
35 import org.apache.sshd.client.channel.ChannelSubsystem;
36 import org.apache.sshd.client.future.AuthFuture;
37 import org.apache.sshd.client.future.ConnectFuture;
38 import org.apache.sshd.client.future.OpenFuture;
39 import org.apache.sshd.common.future.CloseFuture;
40 import org.apache.sshd.common.future.SshFuture;
41 import org.apache.sshd.common.future.SshFutureListener;
42 import org.apache.sshd.common.io.IoInputStream;
43 import org.apache.sshd.common.io.IoOutputStream;
44 import org.apache.sshd.common.io.IoReadFuture;
45 import org.apache.sshd.common.io.IoWriteFuture;
46 import org.apache.sshd.common.util.Buffer;
47 import org.junit.After;
48 import org.junit.Before;
49 import org.junit.Test;
50 import org.mockito.Matchers;
51 import org.mockito.Mock;
52 import org.mockito.MockitoAnnotations;
53 import org.mockito.invocation.InvocationOnMock;
54 import org.mockito.stubbing.Answer;
55 import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
56
57 import com.google.common.util.concurrent.FutureCallback;
58 import com.google.common.util.concurrent.Futures;
59 import com.google.common.util.concurrent.ListenableFuture;
60 import com.google.common.util.concurrent.SettableFuture;
61
62 import io.netty.channel.Channel;
63 import io.netty.channel.ChannelHandlerContext;
64 import io.netty.channel.ChannelPromise;
65
66 public class AsyncSshHandlerTest {
67
68     @Mock
69     private SshClient sshClient;
70     @Mock
71     private AuthenticationHandler authHandler;
72     @Mock
73     private ChannelHandlerContext ctx;
74     @Mock
75     private Channel channel;
76     @Mock
77     private SocketAddress remoteAddress;
78     @Mock
79     private SocketAddress localAddress;
80
81     private AsyncSshHandler asyncSshHandler;
82
83     private SshFutureListener<ConnectFuture> sshConnectListener;
84     private SshFutureListener<AuthFuture> sshAuthListener;
85     private SshFutureListener<OpenFuture> sshChannelOpenListener;
86
87     private ChannelPromise promise;
88
89     @Before
90     public void setUp() throws Exception {
91         MockitoAnnotations.initMocks(this);
92         stubAuth();
93         stubSshClient();
94         stubChannel();
95         stubCtx();
96         stubRemoteAddress();
97
98         promise = getMockedPromise();
99
100         asyncSshHandler = new AsyncSshHandler(authHandler, sshClient);
101     }
102
103     @After
104     public void tearDown() throws Exception {
105         sshConnectListener = null;
106         sshAuthListener = null;
107         sshChannelOpenListener = null;
108         promise = null;
109         asyncSshHandler.close(ctx, getMockedPromise());
110     }
111
112     private void stubAuth() throws IOException {
113         doReturn("usr").when(authHandler).getUsername();
114
115         final AuthFuture authFuture = mock(AuthFuture.class);
116         Futures.addCallback(stubAddListener(authFuture), new SuccessFutureListener<AuthFuture>() {
117             @Override
118             public void onSuccess(final SshFutureListener<AuthFuture> result) {
119                 sshAuthListener = result;
120             }
121         });
122         doReturn(authFuture).when(authHandler).authenticate(any(ClientSession.class));
123     }
124
125     @SuppressWarnings("unchecked")
126     private <T extends SshFuture<T>> ListenableFuture<SshFutureListener<T>> stubAddListener(final T future) {
127         final SettableFuture<SshFutureListener<T>> listenerSettableFuture = SettableFuture.create();
128
129         doAnswer(new Answer() {
130             @Override
131             public Object answer(final InvocationOnMock invocation) throws Throwable {
132                 listenerSettableFuture.set((SshFutureListener<T>) invocation.getArguments()[0]);
133                 return null;
134             }
135         }).when(future).addListener(any(SshFutureListener.class));
136
137         return listenerSettableFuture;
138     }
139
140     private void stubRemoteAddress() {
141         doReturn("remote").when(remoteAddress).toString();
142     }
143
144     private void stubCtx() {
145         doReturn(channel).when(ctx).channel();
146         doReturn(ctx).when(ctx).fireChannelActive();
147         doReturn(ctx).when(ctx).fireChannelInactive();
148         doReturn(ctx).when(ctx).fireChannelRead(anyObject());
149         doReturn(getMockedPromise()).when(ctx).newPromise();
150     }
151
152     private void stubChannel() {
153         doReturn("channel").when(channel).toString();
154     }
155
156     private void stubSshClient() {
157         doNothing().when(sshClient).start();
158         final ConnectFuture connectFuture = mock(ConnectFuture.class);
159         Futures.addCallback(stubAddListener(connectFuture), new SuccessFutureListener<ConnectFuture>() {
160             @Override
161             public void onSuccess(final SshFutureListener<ConnectFuture> result) {
162                 sshConnectListener = result;
163             }
164         });
165         doReturn(connectFuture).when(sshClient).connect("usr", remoteAddress);
166     }
167
168     @Test
169     public void testConnectSuccess() throws Exception {
170         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
171
172         final IoInputStream asyncOut = getMockedIoInputStream();
173         final IoOutputStream asyncIn = getMockedIoOutputStream();
174         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
175         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
176         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
177
178         sshConnectListener.operationComplete(connectFuture);
179         sshAuthListener.operationComplete(getSuccessAuthFuture());
180         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
181
182         verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
183
184         verify(promise).setSuccess();
185         verifyNoMoreInteractions(promise);
186         verify(ctx).fireChannelActive();
187     }
188
189     @Test
190     public void testRead() throws Exception {
191         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
192
193         final IoInputStream asyncOut = getMockedIoInputStream();
194         final IoOutputStream asyncIn = getMockedIoOutputStream();
195         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
196         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
197         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
198
199         sshConnectListener.operationComplete(connectFuture);
200         sshAuthListener.operationComplete(getSuccessAuthFuture());
201         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
202
203         verify(ctx).fireChannelRead(any(ByteBuf.class));
204     }
205
206     @Test
207     public void testReadClosed() throws Exception {
208         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
209
210         final IoInputStream asyncOut = getMockedIoInputStream();
211         final IoReadFuture mockedReadFuture = asyncOut.read(null);
212
213         Futures.addCallback(stubAddListener(mockedReadFuture), new SuccessFutureListener<IoReadFuture>() {
214             @Override
215             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
216                 doReturn(new IllegalStateException()).when(mockedReadFuture).getException();
217                 doReturn(mockedReadFuture).when(mockedReadFuture).removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
218                 doReturn(true).when(asyncOut).isClosing();
219                 doReturn(true).when(asyncOut).isClosed();
220                 result.operationComplete(mockedReadFuture);
221             }
222         });
223
224         final IoOutputStream asyncIn = getMockedIoOutputStream();
225         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
226         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
227         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
228
229         sshConnectListener.operationComplete(connectFuture);
230         sshAuthListener.operationComplete(getSuccessAuthFuture());
231         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
232
233         verify(ctx).fireChannelInactive();
234     }
235
236     @Test
237     public void testReadFail() throws Exception {
238         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
239
240         final IoInputStream asyncOut = getMockedIoInputStream();
241         final IoReadFuture mockedReadFuture = asyncOut.read(null);
242
243         Futures.addCallback(stubAddListener(mockedReadFuture), new SuccessFutureListener<IoReadFuture>() {
244             @Override
245             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
246                 doReturn(new IllegalStateException()).when(mockedReadFuture).getException();
247                 doReturn(mockedReadFuture).when(mockedReadFuture).removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
248                 result.operationComplete(mockedReadFuture);
249             }
250         });
251
252         final IoOutputStream asyncIn = getMockedIoOutputStream();
253         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
254         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
255         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
256
257         sshConnectListener.operationComplete(connectFuture);
258         sshAuthListener.operationComplete(getSuccessAuthFuture());
259         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
260
261         verify(ctx).fireChannelInactive();
262     }
263
264     @Test
265     public void testWrite() throws Exception {
266         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
267
268         final IoInputStream asyncOut = getMockedIoInputStream();
269         final IoOutputStream asyncIn = getMockedIoOutputStream();
270         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
271         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
272         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
273
274         sshConnectListener.operationComplete(connectFuture);
275         sshAuthListener.operationComplete(getSuccessAuthFuture());
276         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
277
278         final ChannelPromise writePromise = getMockedPromise();
279         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), writePromise);
280
281         verify(writePromise).setSuccess();
282     }
283
284     @Test
285     public void testWriteClosed() throws Exception {
286         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
287
288         final IoInputStream asyncOut = getMockedIoInputStream();
289         final IoOutputStream asyncIn = getMockedIoOutputStream();
290
291         final IoWriteFuture ioWriteFuture = asyncIn.write(null);
292
293         Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
294             @Override
295             public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
296                 doReturn(false).when(ioWriteFuture).isWritten();
297                 doReturn(new IllegalStateException()).when(ioWriteFuture).getException();
298                 doReturn(true).when(asyncIn).isClosing();
299                 doReturn(true).when(asyncIn).isClosed();
300                 result.operationComplete(ioWriteFuture);
301             }
302         });
303
304         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
305         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
306         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
307
308         sshConnectListener.operationComplete(connectFuture);
309         sshAuthListener.operationComplete(getSuccessAuthFuture());
310         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
311
312         final ChannelPromise writePromise = getMockedPromise();
313         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), writePromise);
314
315         verify(writePromise).setFailure(any(Throwable.class));
316     }
317
318     @Test
319     public void testWritePendingOne() throws Exception {
320         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
321
322         final IoInputStream asyncOut = getMockedIoInputStream();
323         final IoOutputStream asyncIn = getMockedIoOutputStream();
324         final IoWriteFuture ioWriteFuture = asyncIn.write(null);
325
326         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
327         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
328         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
329
330         sshConnectListener.operationComplete(connectFuture);
331         sshAuthListener.operationComplete(getSuccessAuthFuture());
332         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
333
334         final ChannelPromise firstWritePromise = getMockedPromise();
335
336         // intercept listener for first write, so we can invoke successful write later thus simulate pending of the first write
337         final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture = stubAddListener(ioWriteFuture);
338         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
339         final SshFutureListener<IoWriteFuture> firstWriteListener = firstWriteListenerFuture.get();
340         // intercept second listener, this is the listener for pending write for the pending write to know when pending state ended
341         final ListenableFuture<SshFutureListener<IoWriteFuture>> pendingListener = stubAddListener(ioWriteFuture);
342
343         final ChannelPromise secondWritePromise = getMockedPromise();
344         // now make write throw pending exception
345         doThrow(org.apache.sshd.common.io.WritePendingException.class).when(asyncIn).write(any(Buffer.class));
346         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
347
348         doReturn(ioWriteFuture).when(asyncIn).write(any(Buffer.class));
349
350         verifyZeroInteractions(firstWritePromise, secondWritePromise);
351
352         // make first write stop pending
353         firstWriteListener.operationComplete(ioWriteFuture);
354         // intercept third listener, this is regular listener for second write to determine success or failure
355         final ListenableFuture<SshFutureListener<IoWriteFuture>> afterPendingListener = stubAddListener(ioWriteFuture);
356
357         // notify listener for second write that pending has ended
358         pendingListener.get().operationComplete(ioWriteFuture);
359         // Notify third listener (regular listener for second write) that second write succeeded
360         afterPendingListener.get().operationComplete(ioWriteFuture);
361
362         // verify both write promises successful
363         verify(firstWritePromise).setSuccess();
364         verify(secondWritePromise).setSuccess();
365     }
366
367     @Test
368     public void testWritePendingMax() throws Exception {
369         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
370
371         final IoInputStream asyncOut = getMockedIoInputStream();
372         final IoOutputStream asyncIn = getMockedIoOutputStream();
373         final IoWriteFuture ioWriteFuture = asyncIn.write(null);
374
375         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
376         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
377         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
378
379         sshConnectListener.operationComplete(connectFuture);
380         sshAuthListener.operationComplete(getSuccessAuthFuture());
381         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
382
383         final ChannelPromise firstWritePromise = getMockedPromise();
384
385         // intercept listener for first write, so we can invoke successful write later thus simulate pending of the first write
386         final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture = stubAddListener(ioWriteFuture);
387         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
388
389         final ChannelPromise secondWritePromise = getMockedPromise();
390         // now make write throw pending exception
391         doThrow(org.apache.sshd.common.io.WritePendingException.class).when(asyncIn).write(any(Buffer.class));
392         for (int i = 0; i < 1000; i++) {
393             asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
394         }
395
396         verify(ctx).fireChannelInactive();
397     }
398
399     @Test
400     public void testDisconnect() throws Exception {
401         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
402
403         final IoInputStream asyncOut = getMockedIoInputStream();
404         final IoOutputStream asyncIn = getMockedIoOutputStream();
405         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
406         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
407         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
408
409         sshConnectListener.operationComplete(connectFuture);
410         sshAuthListener.operationComplete(getSuccessAuthFuture());
411         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
412
413         final ChannelPromise disconnectPromise = getMockedPromise();
414         asyncSshHandler.disconnect(ctx, disconnectPromise);
415
416         verify(sshSession).close(anyBoolean());
417         verify(disconnectPromise).setSuccess();
418         verify(ctx).fireChannelInactive();
419     }
420
421     private OpenFuture getSuccessOpenFuture() {
422         final OpenFuture failedOpenFuture = mock(OpenFuture.class);
423         doReturn(true).when(failedOpenFuture).isOpened();
424         return failedOpenFuture;
425     }
426
427     private AuthFuture getSuccessAuthFuture() {
428         final AuthFuture authFuture = mock(AuthFuture.class);
429         doReturn(true).when(authFuture).isSuccess();
430         return authFuture;
431     }
432
433     private ConnectFuture getSuccessConnectFuture(final ClientSession sshSession) {
434         final ConnectFuture connectFuture = mock(ConnectFuture.class);
435         doReturn(true).when(connectFuture).isConnected();
436
437         doReturn(sshSession).when(connectFuture).getSession();
438         return connectFuture;
439     }
440
441     private ClientSession getMockedSshSession(final ChannelSubsystem subsystemChannel) throws IOException {
442         final ClientSession sshSession = mock(ClientSession.class);
443
444         doReturn("sshSession").when(sshSession).toString();
445         doReturn("serverVersion").when(sshSession).getServerVersion();
446         doReturn(false).when(sshSession).isClosed();
447         doReturn(false).when(sshSession).isClosing();
448         final CloseFuture closeFuture = mock(CloseFuture.class);
449         Futures.addCallback(stubAddListener(closeFuture), new SuccessFutureListener<CloseFuture>() {
450             @Override
451             public void onSuccess(final SshFutureListener<CloseFuture> result) {
452                 doReturn(true).when(closeFuture).isClosed();
453                 result.operationComplete(closeFuture);
454             }
455         });
456         doReturn(closeFuture).when(sshSession).close(false);
457
458         doReturn(subsystemChannel).when(sshSession).createSubsystemChannel(anyString());
459
460         return sshSession;
461     }
462
463     private ChannelSubsystem getMockedSubsystemChannel(final IoInputStream asyncOut, final IoOutputStream asyncIn) throws IOException {
464         final ChannelSubsystem subsystemChannel = mock(ChannelSubsystem.class);
465         doNothing().when(subsystemChannel).setStreaming(any(ClientChannel.Streaming.class));
466         final OpenFuture openFuture = mock(OpenFuture.class);
467
468         Futures.addCallback(stubAddListener(openFuture), new SuccessFutureListener<OpenFuture>() {
469             @Override
470             public void onSuccess(final SshFutureListener<OpenFuture> result) {
471                 sshChannelOpenListener = result;
472             }
473         });
474
475         doReturn(asyncOut).when(subsystemChannel).getAsyncOut();
476
477         doReturn(openFuture).when(subsystemChannel).open();
478         doReturn(asyncIn).when(subsystemChannel).getAsyncIn();
479         return subsystemChannel;
480     }
481
482     private IoOutputStream getMockedIoOutputStream() {
483         final IoOutputStream mock = mock(IoOutputStream.class);
484         final IoWriteFuture ioWriteFuture = mock(IoWriteFuture.class);
485         doReturn(ioWriteFuture).when(ioWriteFuture).addListener(Matchers.<SshFutureListener<IoWriteFuture>>any());
486         doReturn(true).when(ioWriteFuture).isWritten();
487
488         Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
489             @Override
490             public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
491                 result.operationComplete(ioWriteFuture);
492             }
493         });
494
495         doReturn(ioWriteFuture).when(mock).write(any(Buffer.class));
496         doReturn(false).when(mock).isClosed();
497         doReturn(false).when(mock).isClosing();
498         return mock;
499     }
500
501     private IoInputStream getMockedIoInputStream() {
502         final IoInputStream mock = mock(IoInputStream.class);
503         final IoReadFuture ioReadFuture = mock(IoReadFuture.class);
504         doReturn(null).when(ioReadFuture).getException();
505         doReturn(ioReadFuture).when(ioReadFuture).removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
506         doReturn(5).when(ioReadFuture).getRead();
507         doReturn(new Buffer(new byte[]{0, 1, 2, 3, 4})).when(ioReadFuture).getBuffer();
508         doReturn(ioReadFuture).when(ioReadFuture).addListener(Matchers.<SshFutureListener<IoReadFuture>>any());
509
510         // Always success for read
511         Futures.addCallback(stubAddListener(ioReadFuture), new SuccessFutureListener<IoReadFuture>() {
512             @Override
513             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
514                 result.operationComplete(ioReadFuture);
515             }
516         });
517
518         doReturn(ioReadFuture).when(mock).read(any(Buffer.class));
519         doReturn(false).when(mock).isClosed();
520         doReturn(false).when(mock).isClosing();
521         return mock;
522     }
523
524     @Test
525     public void testConnectFailOpenChannel() throws Exception {
526         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
527
528         final IoInputStream asyncOut = getMockedIoInputStream();
529         final IoOutputStream asyncIn = getMockedIoOutputStream();
530         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
531         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
532         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
533
534         sshConnectListener.operationComplete(connectFuture);
535
536         sshAuthListener.operationComplete(getSuccessAuthFuture());
537
538         verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
539
540         try {
541             sshChannelOpenListener.operationComplete(getFailedOpenFuture());
542             fail("Exception expected");
543         } catch (final Exception e) {
544             verify(promise).setFailure(any(Throwable.class));
545             verifyNoMoreInteractions(promise);
546             // TODO should ctx.channelInactive be called if we throw exception ?
547         }
548     }
549
550     @Test
551     public void testConnectFailAuth() throws Exception {
552         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
553
554         final ClientSession sshSession = mock(ClientSession.class);
555         doReturn(true).when(sshSession).isClosed();
556         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
557
558         sshConnectListener.operationComplete(connectFuture);
559
560         final AuthFuture authFuture = getFailedAuthFuture();
561
562         try {
563             sshAuthListener.operationComplete(authFuture);
564             fail("Exception expected");
565         } catch (final Exception e) {
566             verify(promise).setFailure(any(Throwable.class));
567             verifyNoMoreInteractions(promise);
568             // TODO should ctx.channelInactive be called ?
569         }
570     }
571
572     private AuthFuture getFailedAuthFuture() {
573         final AuthFuture authFuture = mock(AuthFuture.class);
574         doReturn(false).when(authFuture).isSuccess();
575         doReturn(new IllegalStateException()).when(authFuture).getException();
576         return authFuture;
577     }
578
579     private OpenFuture getFailedOpenFuture() {
580         final OpenFuture authFuture = mock(OpenFuture.class);
581         doReturn(false).when(authFuture).isOpened();
582         doReturn(new IllegalStateException()).when(authFuture).getException();
583         return authFuture;
584     }
585
586     @Test
587     public void testConnectFail() throws Exception {
588         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
589
590         final ConnectFuture connectFuture = getFailedConnectFuture();
591         try {
592             sshConnectListener.operationComplete(connectFuture);
593             fail("Exception expected");
594         } catch (final Exception e) {
595             verify(promise).setFailure(any(Throwable.class));
596             verifyNoMoreInteractions(promise);
597             // TODO should ctx.channelInactive be called ?
598         }
599     }
600
601     private ConnectFuture getFailedConnectFuture() {
602         final ConnectFuture connectFuture = mock(ConnectFuture.class);
603         doReturn(false).when(connectFuture).isConnected();
604         doReturn(new IllegalStateException()).when(connectFuture).getException();
605         return connectFuture;
606     }
607
608     private ChannelPromise getMockedPromise() {
609         final ChannelPromise promise = mock(ChannelPromise.class);
610         doReturn(promise).when(promise).setSuccess();
611         doReturn(promise).when(promise).setFailure(any(Throwable.class));
612         return promise;
613     }
614
615     private static abstract class SuccessFutureListener<T extends SshFuture<T>> implements FutureCallback<SshFutureListener<T>> {
616
617         @Override
618         public abstract void onSuccess(final SshFutureListener<T> result);
619
620         @Override
621         public void onFailure(final Throwable t) {
622             throw new RuntimeException(t);
623         }
624     }
625 }