Replace deprecated Futures.addCallback by the newer version
[netconf.git] / netconf / netconf-netty-util / src / test / java / org / opendaylight / netconf / nettyutil / handler / ssh / client / AsyncSshHandlerTest.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.nettyutil.handler.ssh.client;
10
11 import static org.mockito.Matchers.any;
12 import static org.mockito.Matchers.anyBoolean;
13 import static org.mockito.Matchers.anyObject;
14 import static org.mockito.Matchers.anyString;
15 import static org.mockito.Mockito.doAnswer;
16 import static org.mockito.Mockito.doNothing;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.doThrow;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.spy;
21 import static org.mockito.Mockito.times;
22 import static org.mockito.Mockito.verify;
23 import static org.mockito.Mockito.verifyZeroInteractions;
24
25 import com.google.common.util.concurrent.FutureCallback;
26 import com.google.common.util.concurrent.Futures;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.MoreExecutors;
29 import com.google.common.util.concurrent.SettableFuture;
30 import io.netty.buffer.ByteBuf;
31 import io.netty.buffer.Unpooled;
32 import io.netty.channel.Channel;
33 import io.netty.channel.ChannelFuture;
34 import io.netty.channel.ChannelHandlerContext;
35 import io.netty.channel.ChannelPromise;
36 import io.netty.channel.DefaultChannelPromise;
37 import io.netty.channel.EventLoop;
38 import java.io.IOException;
39 import java.net.SocketAddress;
40 import org.apache.sshd.ClientChannel;
41 import org.apache.sshd.ClientSession;
42 import org.apache.sshd.SshClient;
43 import org.apache.sshd.client.channel.ChannelSubsystem;
44 import org.apache.sshd.client.future.AuthFuture;
45 import org.apache.sshd.client.future.ConnectFuture;
46 import org.apache.sshd.client.future.OpenFuture;
47 import org.apache.sshd.common.future.CloseFuture;
48 import org.apache.sshd.common.future.SshFuture;
49 import org.apache.sshd.common.future.SshFutureListener;
50 import org.apache.sshd.common.io.IoInputStream;
51 import org.apache.sshd.common.io.IoOutputStream;
52 import org.apache.sshd.common.io.IoReadFuture;
53 import org.apache.sshd.common.io.IoWriteFuture;
54 import org.apache.sshd.common.util.Buffer;
55 import org.junit.After;
56 import org.junit.Before;
57 import org.junit.Ignore;
58 import org.junit.Test;
59 import org.mockito.Matchers;
60 import org.mockito.Mock;
61 import org.mockito.MockitoAnnotations;
62 import org.mockito.invocation.InvocationOnMock;
63 import org.mockito.stubbing.Answer;
64 import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
65
66 public class AsyncSshHandlerTest {
67
68     @Mock
69     private SshClient sshClient;
70     @Mock
71     private AuthenticationHandler authHandler;
72     @Mock
73     private ChannelHandlerContext ctx;
74     @Mock
75     private Channel channel;
76     @Mock
77     private SocketAddress remoteAddress;
78     @Mock
79     private SocketAddress localAddress;
80     @Mock
81     private EventLoop eventLoop;
82
83     private AsyncSshHandler asyncSshHandler;
84
85     private SshFutureListener<ConnectFuture> sshConnectListener;
86     private SshFutureListener<AuthFuture> sshAuthListener;
87     private SshFutureListener<OpenFuture> sshChannelOpenListener;
88
89     private ChannelPromise promise;
90
91     @Before
92     public void setUp() throws Exception {
93         MockitoAnnotations.initMocks(this);
94         stubAuth();
95         stubSshClient();
96         stubChannel();
97         stubEventLoop();
98         stubCtx();
99         stubRemoteAddress();
100
101         promise = getMockedPromise();
102
103         asyncSshHandler = new AsyncSshHandler(authHandler, sshClient);
104     }
105
106     @After
107     public void tearDown() throws Exception {
108         sshConnectListener = null;
109         sshAuthListener = null;
110         sshChannelOpenListener = null;
111         promise = null;
112         asyncSshHandler.close(ctx, getMockedPromise());
113     }
114
115     private void stubAuth() throws IOException {
116         doReturn("usr").when(authHandler).getUsername();
117
118         final AuthFuture authFuture = mock(AuthFuture.class);
119         Futures.addCallback(stubAddListener(authFuture), new SuccessFutureListener<AuthFuture>() {
120             @Override
121             public void onSuccess(final SshFutureListener<AuthFuture> result) {
122                 sshAuthListener = result;
123             }
124         }, MoreExecutors.directExecutor());
125         doReturn(authFuture).when(authHandler).authenticate(any(ClientSession.class));
126     }
127
128     @SuppressWarnings("unchecked")
129     private static <T extends SshFuture<T>> ListenableFuture<SshFutureListener<T>> stubAddListener(final T future) {
130         final SettableFuture<SshFutureListener<T>> listenerSettableFuture = SettableFuture.create();
131
132         doAnswer(new Answer<Object>() {
133             @Override
134             public Object answer(final InvocationOnMock invocation) throws Throwable {
135                 listenerSettableFuture.set((SshFutureListener<T>) invocation.getArguments()[0]);
136                 return null;
137             }
138         }).when(future).addListener(any(SshFutureListener.class));
139
140         return listenerSettableFuture;
141     }
142
143     private void stubRemoteAddress() {
144         doReturn("remote").when(remoteAddress).toString();
145     }
146
147     private void stubCtx() {
148         doReturn(channel).when(ctx).channel();
149         doReturn(ctx).when(ctx).fireChannelActive();
150         doReturn(ctx).when(ctx).fireChannelInactive();
151         doReturn(ctx).when(ctx).fireChannelRead(anyObject());
152         doReturn(mock(ChannelFuture.class)).when(ctx).disconnect(any(ChannelPromise.class));
153         doReturn(getMockedPromise()).when(ctx).newPromise();
154     }
155
156     private void stubChannel() {
157         doReturn("channel").when(channel).toString();
158     }
159
160     private void stubEventLoop() {
161         doReturn(eventLoop).when(channel).eventLoop();
162         doReturn(Boolean.TRUE).when(eventLoop).inEventLoop();
163     }
164
165     private void stubSshClient() {
166         doNothing().when(sshClient).start();
167         final ConnectFuture connectFuture = mock(ConnectFuture.class);
168         Futures.addCallback(stubAddListener(connectFuture), new SuccessFutureListener<ConnectFuture>() {
169             @Override
170             public void onSuccess(final SshFutureListener<ConnectFuture> result) {
171                 sshConnectListener = result;
172             }
173         }, MoreExecutors.directExecutor());
174         doReturn(connectFuture).when(sshClient).connect("usr", remoteAddress);
175     }
176
177     @Test
178     public void testConnectSuccess() throws Exception {
179         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
180
181         final IoInputStream asyncOut = getMockedIoInputStream();
182         final IoOutputStream asyncIn = getMockedIoOutputStream();
183         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
184         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
185         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
186
187         sshConnectListener.operationComplete(connectFuture);
188         sshAuthListener.operationComplete(getSuccessAuthFuture());
189         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
190
191         verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
192
193         verify(promise).setSuccess();
194         verify(ctx).fireChannelActive();
195     }
196
197     @Test
198     public void testRead() throws Exception {
199         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
200
201         final IoInputStream asyncOut = getMockedIoInputStream();
202         final IoOutputStream asyncIn = getMockedIoOutputStream();
203         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
204         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
205         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
206
207         sshConnectListener.operationComplete(connectFuture);
208         sshAuthListener.operationComplete(getSuccessAuthFuture());
209         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
210
211         verify(ctx).fireChannelRead(any(ByteBuf.class));
212     }
213
214     @Test
215     public void testReadClosed() throws Exception {
216         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
217
218         final IoInputStream asyncOut = getMockedIoInputStream();
219         final IoReadFuture mockedReadFuture = asyncOut.read(null);
220
221         Futures.addCallback(stubAddListener(mockedReadFuture), new SuccessFutureListener<IoReadFuture>() {
222             @Override
223             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
224                 doReturn(new IllegalStateException()).when(mockedReadFuture).getException();
225                 doReturn(mockedReadFuture).when(mockedReadFuture)
226                         .removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
227                 doReturn(true).when(asyncOut).isClosing();
228                 doReturn(true).when(asyncOut).isClosed();
229                 result.operationComplete(mockedReadFuture);
230             }
231         }, MoreExecutors.directExecutor());
232
233         final IoOutputStream asyncIn = getMockedIoOutputStream();
234         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
235         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
236         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
237
238         sshConnectListener.operationComplete(connectFuture);
239         sshAuthListener.operationComplete(getSuccessAuthFuture());
240         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
241
242         verify(ctx).fireChannelInactive();
243     }
244
245     @Test
246     public void testReadFail() throws Exception {
247         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
248
249         final IoInputStream asyncOut = getMockedIoInputStream();
250         final IoReadFuture mockedReadFuture = asyncOut.read(null);
251
252         Futures.addCallback(stubAddListener(mockedReadFuture), new SuccessFutureListener<IoReadFuture>() {
253             @Override
254             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
255                 doReturn(new IllegalStateException()).when(mockedReadFuture).getException();
256                 doReturn(mockedReadFuture).when(mockedReadFuture)
257                         .removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
258                 result.operationComplete(mockedReadFuture);
259             }
260         }, MoreExecutors.directExecutor());
261
262         final IoOutputStream asyncIn = getMockedIoOutputStream();
263         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
264         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
265         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
266
267         sshConnectListener.operationComplete(connectFuture);
268         sshAuthListener.operationComplete(getSuccessAuthFuture());
269         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
270
271         verify(ctx).fireChannelInactive();
272     }
273
274     @Test
275     public void testWrite() throws Exception {
276         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
277
278         final IoInputStream asyncOut = getMockedIoInputStream();
279         final IoOutputStream asyncIn = getMockedIoOutputStream();
280         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
281         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
282         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
283
284         sshConnectListener.operationComplete(connectFuture);
285         sshAuthListener.operationComplete(getSuccessAuthFuture());
286         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
287
288         final ChannelPromise writePromise = getMockedPromise();
289         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), writePromise);
290
291         verify(writePromise).setSuccess();
292     }
293
294     @Test
295     public void testWriteClosed() throws Exception {
296         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
297
298         final IoInputStream asyncOut = getMockedIoInputStream();
299         final IoOutputStream asyncIn = getMockedIoOutputStream();
300
301         final IoWriteFuture ioWriteFuture = asyncIn.write(null);
302
303         Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
304             @Override
305             public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
306                 doReturn(false).when(ioWriteFuture).isWritten();
307                 doReturn(new IllegalStateException()).when(ioWriteFuture).getException();
308                 doReturn(true).when(asyncIn).isClosing();
309                 doReturn(true).when(asyncIn).isClosed();
310                 result.operationComplete(ioWriteFuture);
311             }
312         }, MoreExecutors.directExecutor());
313
314         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
315         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
316         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
317
318         sshConnectListener.operationComplete(connectFuture);
319         sshAuthListener.operationComplete(getSuccessAuthFuture());
320         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
321
322         final ChannelPromise writePromise = getMockedPromise();
323         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), writePromise);
324
325         verify(writePromise).setFailure(any(Throwable.class));
326     }
327
328     @Test
329     public void testWritePendingOne() throws Exception {
330         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
331
332         final IoInputStream asyncOut = getMockedIoInputStream();
333         final IoOutputStream asyncIn = getMockedIoOutputStream();
334         final IoWriteFuture ioWriteFuture = asyncIn.write(null);
335
336         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
337         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
338         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
339
340         sshConnectListener.operationComplete(connectFuture);
341         sshAuthListener.operationComplete(getSuccessAuthFuture());
342         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
343
344         final ChannelPromise firstWritePromise = getMockedPromise();
345
346         // intercept listener for first write,
347         // so we can invoke successful write later thus simulate pending of the first write
348         final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture =
349                 stubAddListener(ioWriteFuture);
350         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
351         final SshFutureListener<IoWriteFuture> firstWriteListener = firstWriteListenerFuture.get();
352         // intercept second listener,
353         // this is the listener for pending write for the pending write to know when pending state ended
354         final ListenableFuture<SshFutureListener<IoWriteFuture>> pendingListener = stubAddListener(ioWriteFuture);
355
356         final ChannelPromise secondWritePromise = getMockedPromise();
357         // now make write throw pending exception
358         doThrow(org.apache.sshd.common.io.WritePendingException.class).when(asyncIn).write(any(Buffer.class));
359         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
360
361         doReturn(ioWriteFuture).when(asyncIn).write(any(Buffer.class));
362
363         verifyZeroInteractions(firstWritePromise, secondWritePromise);
364
365         // make first write stop pending
366         firstWriteListener.operationComplete(ioWriteFuture);
367
368         // notify listener for second write that pending has ended
369         pendingListener.get().operationComplete(ioWriteFuture);
370
371         // verify both write promises successful
372         verify(firstWritePromise).setSuccess();
373         verify(secondWritePromise).setSuccess();
374     }
375
376     @Ignore("Pending queue is not limited")
377     @Test
378     public void testWritePendingMax() throws Exception {
379         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
380
381         final IoInputStream asyncOut = getMockedIoInputStream();
382         final IoOutputStream asyncIn = getMockedIoOutputStream();
383         final IoWriteFuture ioWriteFuture = asyncIn.write(null);
384
385         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
386         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
387         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
388
389         sshConnectListener.operationComplete(connectFuture);
390         sshAuthListener.operationComplete(getSuccessAuthFuture());
391         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
392
393         final ChannelPromise firstWritePromise = getMockedPromise();
394
395         // intercept listener for first write,
396         // so we can invoke successful write later thus simulate pending of the first write
397         final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture =
398                 stubAddListener(ioWriteFuture);
399         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
400
401         final ChannelPromise secondWritePromise = getMockedPromise();
402         // now make write throw pending exception
403         doThrow(org.apache.sshd.common.io.WritePendingException.class).when(asyncIn).write(any(Buffer.class));
404         for (int i = 0; i < 1001; i++) {
405             asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
406         }
407
408         verify(secondWritePromise, times(1)).setFailure(any(Throwable.class));
409     }
410
411     @Test
412     public void testDisconnect() throws Exception {
413         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
414
415         final IoInputStream asyncOut = getMockedIoInputStream();
416         final IoOutputStream asyncIn = getMockedIoOutputStream();
417         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
418         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
419         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
420
421         sshConnectListener.operationComplete(connectFuture);
422         sshAuthListener.operationComplete(getSuccessAuthFuture());
423         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
424
425         final ChannelPromise disconnectPromise = getMockedPromise();
426         asyncSshHandler.disconnect(ctx, disconnectPromise);
427
428         verify(sshSession).close(anyBoolean());
429         verify(disconnectPromise).setSuccess();
430         verify(ctx).fireChannelInactive();
431     }
432
433     private static OpenFuture getSuccessOpenFuture() {
434         final OpenFuture failedOpenFuture = mock(OpenFuture.class);
435         doReturn(true).when(failedOpenFuture).isOpened();
436         return failedOpenFuture;
437     }
438
439     private static AuthFuture getSuccessAuthFuture() {
440         final AuthFuture authFuture = mock(AuthFuture.class);
441         doReturn(true).when(authFuture).isSuccess();
442         return authFuture;
443     }
444
445     private static ConnectFuture getSuccessConnectFuture(final ClientSession sshSession) {
446         final ConnectFuture connectFuture = mock(ConnectFuture.class);
447         doReturn(true).when(connectFuture).isConnected();
448
449         doReturn(sshSession).when(connectFuture).getSession();
450         return connectFuture;
451     }
452
453     private static ClientSession getMockedSshSession(final ChannelSubsystem subsystemChannel) throws IOException {
454         final ClientSession sshSession = mock(ClientSession.class);
455
456         doReturn("sshSession").when(sshSession).toString();
457         doReturn("serverVersion").when(sshSession).getServerVersion();
458         doReturn(false).when(sshSession).isClosed();
459         doReturn(false).when(sshSession).isClosing();
460         final CloseFuture closeFuture = mock(CloseFuture.class);
461         Futures.addCallback(stubAddListener(closeFuture), new SuccessFutureListener<CloseFuture>() {
462             @Override
463             public void onSuccess(final SshFutureListener<CloseFuture> result) {
464                 doReturn(true).when(closeFuture).isClosed();
465                 result.operationComplete(closeFuture);
466             }
467         }, MoreExecutors.directExecutor());
468         doReturn(closeFuture).when(sshSession).close(false);
469
470         doReturn(subsystemChannel).when(sshSession).createSubsystemChannel(anyString());
471
472         return sshSession;
473     }
474
475     private ChannelSubsystem getMockedSubsystemChannel(final IoInputStream asyncOut,
476                                                        final IoOutputStream asyncIn) throws IOException {
477         final ChannelSubsystem subsystemChannel = mock(ChannelSubsystem.class);
478         doReturn("subsystemChannel").when(subsystemChannel).toString();
479
480         doNothing().when(subsystemChannel).setStreaming(any(ClientChannel.Streaming.class));
481         final OpenFuture openFuture = mock(OpenFuture.class);
482
483         Futures.addCallback(stubAddListener(openFuture), new SuccessFutureListener<OpenFuture>() {
484             @Override
485             public void onSuccess(final SshFutureListener<OpenFuture> result) {
486                 sshChannelOpenListener = result;
487             }
488         }, MoreExecutors.directExecutor());
489
490         doReturn(asyncOut).when(subsystemChannel).getAsyncOut();
491
492         doReturn(openFuture).when(subsystemChannel).open();
493         doReturn(asyncIn).when(subsystemChannel).getAsyncIn();
494         return subsystemChannel;
495     }
496
497     private static IoOutputStream getMockedIoOutputStream() {
498         final IoOutputStream mock = mock(IoOutputStream.class);
499         final IoWriteFuture ioWriteFuture = mock(IoWriteFuture.class);
500         doReturn(ioWriteFuture).when(ioWriteFuture).addListener(Matchers.<SshFutureListener<IoWriteFuture>>any());
501         doReturn(true).when(ioWriteFuture).isWritten();
502
503         Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
504             @Override
505             public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
506                 result.operationComplete(ioWriteFuture);
507             }
508         }, MoreExecutors.directExecutor());
509
510         doReturn(ioWriteFuture).when(mock).write(any(Buffer.class));
511         doReturn(false).when(mock).isClosed();
512         doReturn(false).when(mock).isClosing();
513         return mock;
514     }
515
516     private static IoInputStream getMockedIoInputStream() {
517         final IoInputStream mock = mock(IoInputStream.class);
518         final IoReadFuture ioReadFuture = mock(IoReadFuture.class);
519         doReturn(null).when(ioReadFuture).getException();
520         doReturn(ioReadFuture).when(ioReadFuture).removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
521         doReturn(5).when(ioReadFuture).getRead();
522         doReturn(new Buffer(new byte[]{0, 1, 2, 3, 4})).when(ioReadFuture).getBuffer();
523         doReturn(ioReadFuture).when(ioReadFuture).addListener(Matchers.<SshFutureListener<IoReadFuture>>any());
524
525         // Always success for read
526         Futures.addCallback(stubAddListener(ioReadFuture), new SuccessFutureListener<IoReadFuture>() {
527             @Override
528             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
529                 result.operationComplete(ioReadFuture);
530             }
531         }, MoreExecutors.directExecutor());
532
533         doReturn(ioReadFuture).when(mock).read(any(Buffer.class));
534         doReturn(false).when(mock).isClosed();
535         doReturn(false).when(mock).isClosing();
536         return mock;
537     }
538
539     @Test
540     public void testConnectFailOpenChannel() throws Exception {
541         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
542
543         final IoInputStream asyncOut = getMockedIoInputStream();
544         final IoOutputStream asyncIn = getMockedIoOutputStream();
545         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
546         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
547         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
548
549         sshConnectListener.operationComplete(connectFuture);
550
551         sshAuthListener.operationComplete(getSuccessAuthFuture());
552
553         verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
554
555         sshChannelOpenListener.operationComplete(getFailedOpenFuture());
556         verify(promise).setFailure(any(Throwable.class));
557     }
558
559     @Test
560     public void testConnectFailAuth() throws Exception {
561         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
562
563         final ClientSession sshSession = mock(ClientSession.class);
564         doReturn(true).when(sshSession).isClosed();
565         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
566
567         sshConnectListener.operationComplete(connectFuture);
568
569         final AuthFuture authFuture = getFailedAuthFuture();
570
571         sshAuthListener.operationComplete(authFuture);
572         verify(promise).setFailure(any(Throwable.class));
573     }
574
575     private static AuthFuture getFailedAuthFuture() {
576         final AuthFuture authFuture = mock(AuthFuture.class);
577         doReturn(false).when(authFuture).isSuccess();
578         doReturn(new IllegalStateException()).when(authFuture).getException();
579         return authFuture;
580     }
581
582     private static OpenFuture getFailedOpenFuture() {
583         final OpenFuture authFuture = mock(OpenFuture.class);
584         doReturn(false).when(authFuture).isOpened();
585         doReturn(new IllegalStateException()).when(authFuture).getException();
586         return authFuture;
587     }
588
589     @Test
590     public void testConnectFail() throws Exception {
591         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
592
593         final ConnectFuture connectFuture = getFailedConnectFuture();
594         sshConnectListener.operationComplete(connectFuture);
595         verify(promise).setFailure(any(Throwable.class));
596     }
597
598     private static ConnectFuture getFailedConnectFuture() {
599         final ConnectFuture connectFuture = mock(ConnectFuture.class);
600         doReturn(false).when(connectFuture).isConnected();
601         doReturn(new IllegalStateException()).when(connectFuture).getException();
602         return connectFuture;
603     }
604
605     private ChannelPromise getMockedPromise() {
606         return spy(new DefaultChannelPromise(channel));
607     }
608
609     private abstract static class SuccessFutureListener<T extends SshFuture<T>>
610             implements FutureCallback<SshFutureListener<T>> {
611
612         @Override
613         public abstract void onSuccess(SshFutureListener<T> result);
614
615         @Override
616         public void onFailure(final Throwable throwable) {
617             throw new RuntimeException(throwable);
618         }
619     }
620 }