Integrate MRI projects for Neon
[netconf.git] / netconf / netconf-netty-util / src / test / java / org / opendaylight / netconf / nettyutil / handler / ssh / client / AsyncSshHandlerTest.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.nettyutil.handler.ssh.client;
9
10 import static org.mockito.ArgumentMatchers.any;
11 import static org.mockito.ArgumentMatchers.anyBoolean;
12 import static org.mockito.ArgumentMatchers.anyObject;
13 import static org.mockito.ArgumentMatchers.anyString;
14 import static org.mockito.Mockito.doAnswer;
15 import static org.mockito.Mockito.doNothing;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.doThrow;
18 import static org.mockito.Mockito.mock;
19 import static org.mockito.Mockito.spy;
20 import static org.mockito.Mockito.times;
21 import static org.mockito.Mockito.verify;
22 import static org.mockito.Mockito.verifyZeroInteractions;
23
24 import com.google.common.util.concurrent.FutureCallback;
25 import com.google.common.util.concurrent.Futures;
26 import com.google.common.util.concurrent.ListenableFuture;
27 import com.google.common.util.concurrent.MoreExecutors;
28 import com.google.common.util.concurrent.SettableFuture;
29 import io.netty.buffer.ByteBuf;
30 import io.netty.buffer.Unpooled;
31 import io.netty.channel.Channel;
32 import io.netty.channel.ChannelFuture;
33 import io.netty.channel.ChannelHandlerContext;
34 import io.netty.channel.ChannelPromise;
35 import io.netty.channel.DefaultChannelPromise;
36 import io.netty.channel.EventLoop;
37 import java.io.IOException;
38 import java.net.SocketAddress;
39 import org.apache.sshd.client.SshClient;
40 import org.apache.sshd.client.channel.ChannelSubsystem;
41 import org.apache.sshd.client.channel.ClientChannel;
42 import org.apache.sshd.client.future.AuthFuture;
43 import org.apache.sshd.client.future.ConnectFuture;
44 import org.apache.sshd.client.future.OpenFuture;
45 import org.apache.sshd.client.session.ClientSession;
46 import org.apache.sshd.common.future.CloseFuture;
47 import org.apache.sshd.common.future.SshFuture;
48 import org.apache.sshd.common.future.SshFutureListener;
49 import org.apache.sshd.common.io.IoInputStream;
50 import org.apache.sshd.common.io.IoOutputStream;
51 import org.apache.sshd.common.io.IoReadFuture;
52 import org.apache.sshd.common.io.IoWriteFuture;
53 import org.apache.sshd.common.util.buffer.Buffer;
54 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
55 import org.junit.After;
56 import org.junit.Before;
57 import org.junit.Ignore;
58 import org.junit.Test;
59 import org.mockito.Mock;
60 import org.mockito.MockitoAnnotations;
61 import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
62
63 public class AsyncSshHandlerTest {
64
65     @Mock
66     private SshClient sshClient;
67     @Mock
68     private AuthenticationHandler authHandler;
69     @Mock
70     private ChannelHandlerContext ctx;
71     @Mock
72     private Channel channel;
73     @Mock
74     private SocketAddress remoteAddress;
75     @Mock
76     private SocketAddress localAddress;
77     @Mock
78     private EventLoop eventLoop;
79
80     private AsyncSshHandler asyncSshHandler;
81
82     private SshFutureListener<ConnectFuture> sshConnectListener;
83     private SshFutureListener<AuthFuture> sshAuthListener;
84     private SshFutureListener<OpenFuture> sshChannelOpenListener;
85
86     private ChannelPromise promise;
87
88     @Before
89     public void setUp() throws Exception {
90         MockitoAnnotations.initMocks(this);
91         stubAuth();
92         stubSshClient();
93         stubChannel();
94         stubEventLoop();
95         stubCtx();
96         stubRemoteAddress();
97
98         promise = getMockedPromise();
99
100         asyncSshHandler = new AsyncSshHandler(authHandler, sshClient);
101     }
102
103     @After
104     public void tearDown() throws Exception {
105         sshConnectListener = null;
106         sshAuthListener = null;
107         sshChannelOpenListener = null;
108         promise = null;
109         asyncSshHandler.close(ctx, getMockedPromise());
110     }
111
112     private void stubAuth() throws IOException {
113         doReturn("usr").when(authHandler).getUsername();
114
115         final AuthFuture authFuture = mock(AuthFuture.class);
116         Futures.addCallback(stubAddListener(authFuture), new SuccessFutureListener<AuthFuture>() {
117             @Override
118             public void onSuccess(final SshFutureListener<AuthFuture> result) {
119                 sshAuthListener = result;
120             }
121         }, MoreExecutors.directExecutor());
122         doReturn(authFuture).when(authHandler).authenticate(any(ClientSession.class));
123     }
124
125     @SuppressWarnings("unchecked")
126     private static <T extends SshFuture<T>> ListenableFuture<SshFutureListener<T>> stubAddListener(final T future) {
127         final SettableFuture<SshFutureListener<T>> listenerSettableFuture = SettableFuture.create();
128
129         doAnswer(invocation -> {
130             listenerSettableFuture.set((SshFutureListener<T>) invocation.getArguments()[0]);
131             return null;
132         }).when(future).addListener(any(SshFutureListener.class));
133
134         return listenerSettableFuture;
135     }
136
137     private void stubRemoteAddress() {
138         doReturn("remote").when(remoteAddress).toString();
139     }
140
141     private void stubCtx() {
142         doReturn(channel).when(ctx).channel();
143         doReturn(ctx).when(ctx).fireChannelActive();
144         doReturn(ctx).when(ctx).fireChannelInactive();
145         doReturn(ctx).when(ctx).fireChannelRead(anyObject());
146         doReturn(mock(ChannelFuture.class)).when(ctx).disconnect(any(ChannelPromise.class));
147         doReturn(getMockedPromise()).when(ctx).newPromise();
148     }
149
150     private void stubChannel() {
151         doReturn("channel").when(channel).toString();
152     }
153
154     private void stubEventLoop() {
155         doReturn(eventLoop).when(channel).eventLoop();
156         doReturn(Boolean.TRUE).when(eventLoop).inEventLoop();
157     }
158
159     private void stubSshClient() throws IOException {
160         doNothing().when(sshClient).start();
161         final ConnectFuture connectFuture = mock(ConnectFuture.class);
162         Futures.addCallback(stubAddListener(connectFuture), new SuccessFutureListener<ConnectFuture>() {
163             @Override
164             public void onSuccess(final SshFutureListener<ConnectFuture> result) {
165                 sshConnectListener = result;
166             }
167         }, MoreExecutors.directExecutor());
168         doReturn(connectFuture).when(sshClient).connect("usr", remoteAddress);
169     }
170
171     @Test
172     public void testConnectSuccess() throws Exception {
173         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
174
175         final IoInputStream asyncOut = getMockedIoInputStream();
176         final IoOutputStream asyncIn = getMockedIoOutputStream();
177         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
178         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
179         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
180
181         sshConnectListener.operationComplete(connectFuture);
182         sshAuthListener.operationComplete(getSuccessAuthFuture());
183         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
184
185         verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
186
187         verify(promise).setSuccess();
188         verify(ctx).fireChannelActive();
189     }
190
191     @Test
192     public void testRead() throws Exception {
193         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
194
195         final IoInputStream asyncOut = getMockedIoInputStream();
196         final IoOutputStream asyncIn = getMockedIoOutputStream();
197         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
198         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
199         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
200
201         sshConnectListener.operationComplete(connectFuture);
202         sshAuthListener.operationComplete(getSuccessAuthFuture());
203         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
204
205         verify(ctx).fireChannelRead(any(ByteBuf.class));
206     }
207
208     @Test
209     public void testReadClosed() throws Exception {
210         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
211
212         final IoInputStream asyncOut = getMockedIoInputStream();
213         final IoReadFuture mockedReadFuture = asyncOut.read(null);
214
215         Futures.addCallback(stubAddListener(mockedReadFuture), new SuccessFutureListener<IoReadFuture>() {
216             @Override
217             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
218                 doReturn(new IllegalStateException()).when(mockedReadFuture).getException();
219                 doReturn(mockedReadFuture).when(mockedReadFuture).removeListener(any());
220                 doReturn(true).when(asyncOut).isClosing();
221                 doReturn(true).when(asyncOut).isClosed();
222                 result.operationComplete(mockedReadFuture);
223             }
224         }, MoreExecutors.directExecutor());
225
226         final IoOutputStream asyncIn = getMockedIoOutputStream();
227         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
228         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
229         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
230
231         sshConnectListener.operationComplete(connectFuture);
232         sshAuthListener.operationComplete(getSuccessAuthFuture());
233         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
234
235         verify(ctx).fireChannelInactive();
236     }
237
238     @Test
239     public void testReadFail() throws Exception {
240         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
241
242         final IoInputStream asyncOut = getMockedIoInputStream();
243         final IoReadFuture mockedReadFuture = asyncOut.read(null);
244
245         Futures.addCallback(stubAddListener(mockedReadFuture), new SuccessFutureListener<IoReadFuture>() {
246             @Override
247             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
248                 doReturn(new IllegalStateException()).when(mockedReadFuture).getException();
249                 doReturn(mockedReadFuture).when(mockedReadFuture).removeListener(any());
250                 result.operationComplete(mockedReadFuture);
251             }
252         }, MoreExecutors.directExecutor());
253
254         final IoOutputStream asyncIn = getMockedIoOutputStream();
255         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
256         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
257         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
258
259         sshConnectListener.operationComplete(connectFuture);
260         sshAuthListener.operationComplete(getSuccessAuthFuture());
261         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
262
263         verify(ctx).fireChannelInactive();
264     }
265
266     @Test
267     public void testWrite() throws Exception {
268         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
269
270         final IoInputStream asyncOut = getMockedIoInputStream();
271         final IoOutputStream asyncIn = getMockedIoOutputStream();
272         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
273         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
274         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
275
276         sshConnectListener.operationComplete(connectFuture);
277         sshAuthListener.operationComplete(getSuccessAuthFuture());
278         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
279
280         final ChannelPromise writePromise = getMockedPromise();
281         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), writePromise);
282
283         verify(writePromise).setSuccess();
284     }
285
286     @Test
287     public void testWriteClosed() throws Exception {
288         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
289
290         final IoInputStream asyncOut = getMockedIoInputStream();
291         final IoOutputStream asyncIn = getMockedIoOutputStream();
292
293         final IoWriteFuture ioWriteFuture = asyncIn.writePacket(new ByteArrayBuffer());
294
295         Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
296             @Override
297             public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
298                 doReturn(false).when(ioWriteFuture).isWritten();
299                 doReturn(new IllegalStateException()).when(ioWriteFuture).getException();
300                 doReturn(true).when(asyncIn).isClosing();
301                 doReturn(true).when(asyncIn).isClosed();
302                 result.operationComplete(ioWriteFuture);
303             }
304         }, MoreExecutors.directExecutor());
305
306         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
307         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
308         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
309
310         sshConnectListener.operationComplete(connectFuture);
311         sshAuthListener.operationComplete(getSuccessAuthFuture());
312         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
313
314         final ChannelPromise writePromise = getMockedPromise();
315         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), writePromise);
316
317         verify(writePromise).setFailure(any(Throwable.class));
318     }
319
320     @Test
321     public void testWritePendingOne() throws Exception {
322         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
323
324         final IoInputStream asyncOut = getMockedIoInputStream();
325         final IoOutputStream asyncIn = getMockedIoOutputStream();
326         final IoWriteFuture ioWriteFuture = asyncIn.writePacket(new ByteArrayBuffer());
327
328         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
329         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
330         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
331
332         sshConnectListener.operationComplete(connectFuture);
333         sshAuthListener.operationComplete(getSuccessAuthFuture());
334         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
335
336         final ChannelPromise firstWritePromise = getMockedPromise();
337
338         // intercept listener for first write,
339         // so we can invoke successful write later thus simulate pending of the first write
340         final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture =
341                 stubAddListener(ioWriteFuture);
342         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
343         final SshFutureListener<IoWriteFuture> firstWriteListener = firstWriteListenerFuture.get();
344         // intercept second listener,
345         // this is the listener for pending write for the pending write to know when pending state ended
346         final ListenableFuture<SshFutureListener<IoWriteFuture>> pendingListener = stubAddListener(ioWriteFuture);
347
348         final ChannelPromise secondWritePromise = getMockedPromise();
349         // now make write throw pending exception
350         doThrow(org.apache.sshd.common.io.WritePendingException.class).when(asyncIn).writePacket(any(Buffer.class));
351         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
352
353         doReturn(ioWriteFuture).when(asyncIn).writePacket(any(Buffer.class));
354
355         verifyZeroInteractions(firstWritePromise, secondWritePromise);
356
357         // make first write stop pending
358         firstWriteListener.operationComplete(ioWriteFuture);
359
360         // notify listener for second write that pending has ended
361         pendingListener.get().operationComplete(ioWriteFuture);
362
363         // verify both write promises successful
364         verify(firstWritePromise).setSuccess();
365         verify(secondWritePromise).setSuccess();
366     }
367
368     @Ignore("Pending queue is not limited")
369     @Test
370     public void testWritePendingMax() throws Exception {
371         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
372
373         final IoInputStream asyncOut = getMockedIoInputStream();
374         final IoOutputStream asyncIn = getMockedIoOutputStream();
375         final IoWriteFuture ioWriteFuture = asyncIn.writePacket(new ByteArrayBuffer());
376
377         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
378         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
379         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
380
381         sshConnectListener.operationComplete(connectFuture);
382         sshAuthListener.operationComplete(getSuccessAuthFuture());
383         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
384
385         final ChannelPromise firstWritePromise = getMockedPromise();
386
387         // intercept listener for first write,
388         // so we can invoke successful write later thus simulate pending of the first write
389         final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture =
390                 stubAddListener(ioWriteFuture);
391         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
392
393         final ChannelPromise secondWritePromise = getMockedPromise();
394         // now make write throw pending exception
395         doThrow(org.apache.sshd.common.io.WritePendingException.class).when(asyncIn).writePacket(any(Buffer.class));
396         for (int i = 0; i < 1001; i++) {
397             asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
398         }
399
400         verify(secondWritePromise, times(1)).setFailure(any(Throwable.class));
401     }
402
403     @Test
404     public void testDisconnect() throws Exception {
405         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
406
407         final IoInputStream asyncOut = getMockedIoInputStream();
408         final IoOutputStream asyncIn = getMockedIoOutputStream();
409         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
410         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
411         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
412
413         sshConnectListener.operationComplete(connectFuture);
414         sshAuthListener.operationComplete(getSuccessAuthFuture());
415         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
416
417         final ChannelPromise disconnectPromise = getMockedPromise();
418         asyncSshHandler.disconnect(ctx, disconnectPromise);
419
420         verify(sshSession).close(anyBoolean());
421         verify(disconnectPromise).setSuccess();
422         verify(ctx).fireChannelInactive();
423     }
424
425     private static OpenFuture getSuccessOpenFuture() {
426         final OpenFuture failedOpenFuture = mock(OpenFuture.class);
427         doReturn(true).when(failedOpenFuture).isOpened();
428         return failedOpenFuture;
429     }
430
431     private static AuthFuture getSuccessAuthFuture() {
432         final AuthFuture authFuture = mock(AuthFuture.class);
433         doReturn(true).when(authFuture).isSuccess();
434         return authFuture;
435     }
436
437     private static ConnectFuture getSuccessConnectFuture(final ClientSession sshSession) {
438         final ConnectFuture connectFuture = mock(ConnectFuture.class);
439         doReturn(true).when(connectFuture).isConnected();
440
441         doReturn(sshSession).when(connectFuture).getSession();
442         return connectFuture;
443     }
444
445     private static ClientSession getMockedSshSession(final ChannelSubsystem subsystemChannel) throws IOException {
446         final ClientSession sshSession = mock(ClientSession.class);
447
448         doReturn("sshSession").when(sshSession).toString();
449         doReturn("serverVersion").when(sshSession).getServerVersion();
450         doReturn(false).when(sshSession).isClosed();
451         doReturn(false).when(sshSession).isClosing();
452         final CloseFuture closeFuture = mock(CloseFuture.class);
453         Futures.addCallback(stubAddListener(closeFuture), new SuccessFutureListener<CloseFuture>() {
454             @Override
455             public void onSuccess(final SshFutureListener<CloseFuture> result) {
456                 doReturn(true).when(closeFuture).isClosed();
457                 result.operationComplete(closeFuture);
458             }
459         }, MoreExecutors.directExecutor());
460         doReturn(closeFuture).when(sshSession).close(false);
461
462         doReturn(subsystemChannel).when(sshSession).createSubsystemChannel(anyString());
463
464         return sshSession;
465     }
466
467     private ChannelSubsystem getMockedSubsystemChannel(final IoInputStream asyncOut,
468                                                        final IoOutputStream asyncIn) throws IOException {
469         final ChannelSubsystem subsystemChannel = mock(ChannelSubsystem.class);
470         doReturn("subsystemChannel").when(subsystemChannel).toString();
471
472         doNothing().when(subsystemChannel).setStreaming(any(ClientChannel.Streaming.class));
473         final OpenFuture openFuture = mock(OpenFuture.class);
474
475         Futures.addCallback(stubAddListener(openFuture), new SuccessFutureListener<OpenFuture>() {
476             @Override
477             public void onSuccess(final SshFutureListener<OpenFuture> result) {
478                 sshChannelOpenListener = result;
479             }
480         }, MoreExecutors.directExecutor());
481
482         doReturn(asyncOut).when(subsystemChannel).getAsyncOut();
483
484         doReturn(openFuture).when(subsystemChannel).open();
485         doReturn(asyncIn).when(subsystemChannel).getAsyncIn();
486         return subsystemChannel;
487     }
488
489     private static IoOutputStream getMockedIoOutputStream() throws IOException {
490         final IoOutputStream mock = mock(IoOutputStream.class);
491         final IoWriteFuture ioWriteFuture = mock(IoWriteFuture.class);
492         doReturn(ioWriteFuture).when(ioWriteFuture).addListener(any());
493         doReturn(true).when(ioWriteFuture).isWritten();
494
495         Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
496             @Override
497             public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
498                 result.operationComplete(ioWriteFuture);
499             }
500         }, MoreExecutors.directExecutor());
501
502         doReturn(ioWriteFuture).when(mock).writePacket(any(Buffer.class));
503         doReturn(false).when(mock).isClosed();
504         doReturn(false).when(mock).isClosing();
505         return mock;
506     }
507
508     private static IoInputStream getMockedIoInputStream() {
509         final IoInputStream mock = mock(IoInputStream.class);
510         final IoReadFuture ioReadFuture = mock(IoReadFuture.class);
511         doReturn(null).when(ioReadFuture).getException();
512         doReturn(ioReadFuture).when(ioReadFuture).removeListener(any());
513         doReturn(ioReadFuture).when(mock).read(any());
514         doReturn(5).when(ioReadFuture).getRead();
515         doReturn(new ByteArrayBuffer(new byte[]{0, 1, 2, 3, 4})).when(ioReadFuture).getBuffer();
516         doReturn(ioReadFuture).when(ioReadFuture).addListener(any());
517
518         // Always success for read
519         Futures.addCallback(stubAddListener(ioReadFuture), new SuccessFutureListener<IoReadFuture>() {
520             @Override
521             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
522                 result.operationComplete(ioReadFuture);
523             }
524         }, MoreExecutors.directExecutor());
525
526         doReturn(ioReadFuture).when(mock).read(any(Buffer.class));
527         doReturn(false).when(mock).isClosed();
528         doReturn(false).when(mock).isClosing();
529         return mock;
530     }
531
532     @Test
533     public void testConnectFailOpenChannel() throws Exception {
534         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
535
536         final IoInputStream asyncOut = getMockedIoInputStream();
537         final IoOutputStream asyncIn = getMockedIoOutputStream();
538         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
539         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
540         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
541
542         sshConnectListener.operationComplete(connectFuture);
543
544         sshAuthListener.operationComplete(getSuccessAuthFuture());
545
546         verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
547
548         sshChannelOpenListener.operationComplete(getFailedOpenFuture());
549         verify(promise).setFailure(any(Throwable.class));
550     }
551
552     @Test
553     public void testConnectFailAuth() throws Exception {
554         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
555
556         final ClientSession sshSession = mock(ClientSession.class);
557         doReturn(true).when(sshSession).isClosed();
558         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
559
560         sshConnectListener.operationComplete(connectFuture);
561
562         final AuthFuture authFuture = getFailedAuthFuture();
563
564         sshAuthListener.operationComplete(authFuture);
565         verify(promise).setFailure(any(Throwable.class));
566     }
567
568     private static AuthFuture getFailedAuthFuture() {
569         final AuthFuture authFuture = mock(AuthFuture.class);
570         doReturn(false).when(authFuture).isSuccess();
571         doReturn(new IllegalStateException()).when(authFuture).getException();
572         return authFuture;
573     }
574
575     private static OpenFuture getFailedOpenFuture() {
576         final OpenFuture authFuture = mock(OpenFuture.class);
577         doReturn(false).when(authFuture).isOpened();
578         doReturn(new IllegalStateException()).when(authFuture).getException();
579         return authFuture;
580     }
581
582     @Test
583     public void testConnectFail() throws Exception {
584         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
585
586         final ConnectFuture connectFuture = getFailedConnectFuture();
587         sshConnectListener.operationComplete(connectFuture);
588         verify(promise).setFailure(any(Throwable.class));
589     }
590
591     private static ConnectFuture getFailedConnectFuture() {
592         final ConnectFuture connectFuture = mock(ConnectFuture.class);
593         doReturn(false).when(connectFuture).isConnected();
594         doReturn(new IllegalStateException()).when(connectFuture).getException();
595         return connectFuture;
596     }
597
598     private ChannelPromise getMockedPromise() {
599         return spy(new DefaultChannelPromise(channel));
600     }
601
602     private abstract static class SuccessFutureListener<T extends SshFuture<T>>
603             implements FutureCallback<SshFutureListener<T>> {
604
605         @Override
606         public abstract void onSuccess(SshFutureListener<T> result);
607
608         @Override
609         public void onFailure(final Throwable throwable) {
610             throw new RuntimeException(throwable);
611         }
612     }
613 }