NETCONF-125 connection timeout and between timeout are fixed
[netconf.git] / netconf / netconf-netty-util / src / test / java / org / opendaylight / netconf / nettyutil / handler / ssh / client / AsyncSshHandlerTest.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.nettyutil.handler.ssh.client;
9
10 import static org.mockito.ArgumentMatchers.any;
11 import static org.mockito.ArgumentMatchers.anyBoolean;
12 import static org.mockito.ArgumentMatchers.anyObject;
13 import static org.mockito.ArgumentMatchers.anyString;
14 import static org.mockito.Mockito.doAnswer;
15 import static org.mockito.Mockito.doNothing;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.doThrow;
18 import static org.mockito.Mockito.mock;
19 import static org.mockito.Mockito.spy;
20 import static org.mockito.Mockito.times;
21 import static org.mockito.Mockito.verify;
22 import static org.mockito.Mockito.verifyZeroInteractions;
23
24 import com.google.common.util.concurrent.FutureCallback;
25 import com.google.common.util.concurrent.Futures;
26 import com.google.common.util.concurrent.ListenableFuture;
27 import com.google.common.util.concurrent.MoreExecutors;
28 import com.google.common.util.concurrent.SettableFuture;
29 import io.netty.buffer.ByteBuf;
30 import io.netty.buffer.Unpooled;
31 import io.netty.channel.Channel;
32 import io.netty.channel.ChannelConfig;
33 import io.netty.channel.ChannelFuture;
34 import io.netty.channel.ChannelHandlerContext;
35 import io.netty.channel.ChannelPromise;
36 import io.netty.channel.DefaultChannelPromise;
37 import io.netty.channel.EventLoop;
38 import java.io.IOException;
39 import java.net.SocketAddress;
40 import java.util.concurrent.TimeUnit;
41 import org.apache.sshd.client.SshClient;
42 import org.apache.sshd.client.channel.ChannelSubsystem;
43 import org.apache.sshd.client.channel.ClientChannel;
44 import org.apache.sshd.client.future.AuthFuture;
45 import org.apache.sshd.client.future.ConnectFuture;
46 import org.apache.sshd.client.future.OpenFuture;
47 import org.apache.sshd.client.session.ClientSession;
48 import org.apache.sshd.common.future.CloseFuture;
49 import org.apache.sshd.common.future.SshFuture;
50 import org.apache.sshd.common.future.SshFutureListener;
51 import org.apache.sshd.common.io.IoInputStream;
52 import org.apache.sshd.common.io.IoOutputStream;
53 import org.apache.sshd.common.io.IoReadFuture;
54 import org.apache.sshd.common.io.IoWriteFuture;
55 import org.apache.sshd.common.util.buffer.Buffer;
56 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
57 import org.junit.After;
58 import org.junit.Before;
59 import org.junit.Ignore;
60 import org.junit.Test;
61 import org.mockito.Mock;
62 import org.mockito.MockitoAnnotations;
63 import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
64
65 public class AsyncSshHandlerTest {
66
67     @Mock
68     private SshClient sshClient;
69     @Mock
70     private AuthenticationHandler authHandler;
71     @Mock
72     private ChannelHandlerContext ctx;
73     @Mock
74     private Channel channel;
75     @Mock
76     private SocketAddress remoteAddress;
77     @Mock
78     private SocketAddress localAddress;
79     @Mock
80     private EventLoop eventLoop;
81     @Mock
82     private ChannelConfig channelConfig;
83
84     private AsyncSshHandler asyncSshHandler;
85
86     private SshFutureListener<ConnectFuture> sshConnectListener;
87     private SshFutureListener<AuthFuture> sshAuthListener;
88     private SshFutureListener<OpenFuture> sshChannelOpenListener;
89
90     private ChannelPromise promise;
91
92     @Before
93     public void setUp() throws Exception {
94         MockitoAnnotations.initMocks(this);
95         stubAuth();
96         stubSshClient();
97         stubChannel();
98         stubEventLoop();
99         stubCtx();
100         stubRemoteAddress();
101
102         promise = getMockedPromise();
103
104         asyncSshHandler = new AsyncSshHandler(authHandler, sshClient);
105     }
106
107     @After
108     public void tearDown() throws Exception {
109         sshConnectListener = null;
110         sshAuthListener = null;
111         sshChannelOpenListener = null;
112         promise = null;
113         asyncSshHandler.close(ctx, getMockedPromise());
114     }
115
116     private void stubAuth() throws IOException {
117         doReturn("usr").when(authHandler).getUsername();
118
119         final AuthFuture authFuture = mock(AuthFuture.class);
120         Futures.addCallback(stubAddListener(authFuture), new SuccessFutureListener<AuthFuture>() {
121             @Override
122             public void onSuccess(final SshFutureListener<AuthFuture> result) {
123                 sshAuthListener = result;
124             }
125         }, MoreExecutors.directExecutor());
126         doReturn(authFuture).when(authHandler).authenticate(any(ClientSession.class));
127     }
128
129     @SuppressWarnings("unchecked")
130     private static <T extends SshFuture<T>> ListenableFuture<SshFutureListener<T>> stubAddListener(final T future) {
131         final SettableFuture<SshFutureListener<T>> listenerSettableFuture = SettableFuture.create();
132
133         doAnswer(invocation -> {
134             listenerSettableFuture.set((SshFutureListener<T>) invocation.getArguments()[0]);
135             return null;
136         }).when(future).addListener(any(SshFutureListener.class));
137
138         return listenerSettableFuture;
139     }
140
141     private void stubRemoteAddress() {
142         doReturn("remote").when(remoteAddress).toString();
143     }
144
145     private void stubCtx() {
146         doReturn(channel).when(ctx).channel();
147         doReturn(ctx).when(ctx).fireChannelActive();
148         doReturn(ctx).when(ctx).fireChannelInactive();
149         doReturn(ctx).when(ctx).fireChannelRead(anyObject());
150         doReturn(mock(ChannelFuture.class)).when(ctx).disconnect(any(ChannelPromise.class));
151         doReturn(getMockedPromise()).when(ctx).newPromise();
152     }
153
154     private void stubChannel() {
155         doReturn("channel").when(channel).toString();
156     }
157
158     private void stubEventLoop() {
159         doReturn(eventLoop).when(channel).eventLoop();
160         doReturn(Boolean.TRUE).when(eventLoop).inEventLoop();
161     }
162
163     private void stubSshClient() throws IOException {
164         doNothing().when(sshClient).start();
165         final ConnectFuture connectFuture = mock(ConnectFuture.class);
166         Futures.addCallback(stubAddListener(connectFuture), new SuccessFutureListener<ConnectFuture>() {
167             @Override
168             public void onSuccess(final SshFutureListener<ConnectFuture> result) {
169                 sshConnectListener = result;
170             }
171         }, MoreExecutors.directExecutor());
172         doReturn(connectFuture).when(sshClient).connect("usr", remoteAddress);
173         doReturn(channelConfig).when(channel).config();
174         doReturn(1).when(channelConfig).getConnectTimeoutMillis();
175         doReturn(connectFuture).when(connectFuture).verify(1,TimeUnit.MILLISECONDS);
176     }
177
178     @Test
179     public void testConnectSuccess() throws Exception {
180         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
181
182         final IoInputStream asyncOut = getMockedIoInputStream();
183         final IoOutputStream asyncIn = getMockedIoOutputStream();
184         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
185         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
186         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
187
188         sshConnectListener.operationComplete(connectFuture);
189         sshAuthListener.operationComplete(getSuccessAuthFuture());
190         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
191
192         verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
193
194         verify(promise).setSuccess();
195         verify(ctx).fireChannelActive();
196     }
197
198     @Test
199     public void testRead() throws Exception {
200         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
201
202         final IoInputStream asyncOut = getMockedIoInputStream();
203         final IoOutputStream asyncIn = getMockedIoOutputStream();
204         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
205         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
206         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
207
208         sshConnectListener.operationComplete(connectFuture);
209         sshAuthListener.operationComplete(getSuccessAuthFuture());
210         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
211
212         verify(ctx).fireChannelRead(any(ByteBuf.class));
213     }
214
215     @Test
216     public void testReadClosed() throws Exception {
217         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
218
219         final IoInputStream asyncOut = getMockedIoInputStream();
220         final IoReadFuture mockedReadFuture = asyncOut.read(null);
221
222         Futures.addCallback(stubAddListener(mockedReadFuture), new SuccessFutureListener<IoReadFuture>() {
223             @Override
224             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
225                 doReturn(new IllegalStateException()).when(mockedReadFuture).getException();
226                 doReturn(mockedReadFuture).when(mockedReadFuture).removeListener(any());
227                 doReturn(true).when(asyncOut).isClosing();
228                 doReturn(true).when(asyncOut).isClosed();
229                 result.operationComplete(mockedReadFuture);
230             }
231         }, MoreExecutors.directExecutor());
232
233         final IoOutputStream asyncIn = getMockedIoOutputStream();
234         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
235         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
236         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
237
238         sshConnectListener.operationComplete(connectFuture);
239         sshAuthListener.operationComplete(getSuccessAuthFuture());
240         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
241
242         verify(ctx).fireChannelInactive();
243     }
244
245     @Test
246     public void testReadFail() throws Exception {
247         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
248
249         final IoInputStream asyncOut = getMockedIoInputStream();
250         final IoReadFuture mockedReadFuture = asyncOut.read(null);
251
252         Futures.addCallback(stubAddListener(mockedReadFuture), new SuccessFutureListener<IoReadFuture>() {
253             @Override
254             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
255                 doReturn(new IllegalStateException()).when(mockedReadFuture).getException();
256                 doReturn(mockedReadFuture).when(mockedReadFuture).removeListener(any());
257                 result.operationComplete(mockedReadFuture);
258             }
259         }, MoreExecutors.directExecutor());
260
261         final IoOutputStream asyncIn = getMockedIoOutputStream();
262         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
263         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
264         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
265
266         sshConnectListener.operationComplete(connectFuture);
267         sshAuthListener.operationComplete(getSuccessAuthFuture());
268         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
269
270         verify(ctx).fireChannelInactive();
271     }
272
273     @Test
274     public void testWrite() throws Exception {
275         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
276
277         final IoInputStream asyncOut = getMockedIoInputStream();
278         final IoOutputStream asyncIn = getMockedIoOutputStream();
279         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
280         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
281         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
282
283         sshConnectListener.operationComplete(connectFuture);
284         sshAuthListener.operationComplete(getSuccessAuthFuture());
285         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
286
287         final ChannelPromise writePromise = getMockedPromise();
288         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), writePromise);
289
290         verify(writePromise).setSuccess();
291     }
292
293     @Test
294     public void testWriteClosed() throws Exception {
295         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
296
297         final IoInputStream asyncOut = getMockedIoInputStream();
298         final IoOutputStream asyncIn = getMockedIoOutputStream();
299
300         final IoWriteFuture ioWriteFuture = asyncIn.writePacket(new ByteArrayBuffer());
301
302         Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
303             @Override
304             public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
305                 doReturn(false).when(ioWriteFuture).isWritten();
306                 doReturn(new IllegalStateException()).when(ioWriteFuture).getException();
307                 doReturn(true).when(asyncIn).isClosing();
308                 doReturn(true).when(asyncIn).isClosed();
309                 result.operationComplete(ioWriteFuture);
310             }
311         }, MoreExecutors.directExecutor());
312
313         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
314         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
315         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
316
317         sshConnectListener.operationComplete(connectFuture);
318         sshAuthListener.operationComplete(getSuccessAuthFuture());
319         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
320
321         final ChannelPromise writePromise = getMockedPromise();
322         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), writePromise);
323
324         verify(writePromise).setFailure(any(Throwable.class));
325     }
326
327     @Test
328     public void testWritePendingOne() throws Exception {
329         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
330
331         final IoInputStream asyncOut = getMockedIoInputStream();
332         final IoOutputStream asyncIn = getMockedIoOutputStream();
333         final IoWriteFuture ioWriteFuture = asyncIn.writePacket(new ByteArrayBuffer());
334
335         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
336         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
337         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
338
339         sshConnectListener.operationComplete(connectFuture);
340         sshAuthListener.operationComplete(getSuccessAuthFuture());
341         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
342
343         final ChannelPromise firstWritePromise = getMockedPromise();
344
345         // intercept listener for first write,
346         // so we can invoke successful write later thus simulate pending of the first write
347         final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture =
348                 stubAddListener(ioWriteFuture);
349         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
350         final SshFutureListener<IoWriteFuture> firstWriteListener = firstWriteListenerFuture.get();
351         // intercept second listener,
352         // this is the listener for pending write for the pending write to know when pending state ended
353         final ListenableFuture<SshFutureListener<IoWriteFuture>> pendingListener = stubAddListener(ioWriteFuture);
354
355         final ChannelPromise secondWritePromise = getMockedPromise();
356         // now make write throw pending exception
357         doThrow(org.apache.sshd.common.io.WritePendingException.class).when(asyncIn).writePacket(any(Buffer.class));
358         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
359
360         doReturn(ioWriteFuture).when(asyncIn).writePacket(any(Buffer.class));
361
362         verifyZeroInteractions(firstWritePromise, secondWritePromise);
363
364         // make first write stop pending
365         firstWriteListener.operationComplete(ioWriteFuture);
366
367         // notify listener for second write that pending has ended
368         pendingListener.get().operationComplete(ioWriteFuture);
369
370         // verify both write promises successful
371         verify(firstWritePromise).setSuccess();
372         verify(secondWritePromise).setSuccess();
373     }
374
375     @Ignore("Pending queue is not limited")
376     @Test
377     public void testWritePendingMax() throws Exception {
378         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
379
380         final IoInputStream asyncOut = getMockedIoInputStream();
381         final IoOutputStream asyncIn = getMockedIoOutputStream();
382         final IoWriteFuture ioWriteFuture = asyncIn.writePacket(new ByteArrayBuffer());
383
384         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
385         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
386         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
387
388         sshConnectListener.operationComplete(connectFuture);
389         sshAuthListener.operationComplete(getSuccessAuthFuture());
390         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
391
392         final ChannelPromise firstWritePromise = getMockedPromise();
393
394         // intercept listener for first write,
395         // so we can invoke successful write later thus simulate pending of the first write
396         final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture =
397                 stubAddListener(ioWriteFuture);
398         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
399
400         final ChannelPromise secondWritePromise = getMockedPromise();
401         // now make write throw pending exception
402         doThrow(org.apache.sshd.common.io.WritePendingException.class).when(asyncIn).writePacket(any(Buffer.class));
403         for (int i = 0; i < 1001; i++) {
404             asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
405         }
406
407         verify(secondWritePromise, times(1)).setFailure(any(Throwable.class));
408     }
409
410     @Test
411     public void testDisconnect() throws Exception {
412         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
413
414         final IoInputStream asyncOut = getMockedIoInputStream();
415         final IoOutputStream asyncIn = getMockedIoOutputStream();
416         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
417         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
418         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
419
420         sshConnectListener.operationComplete(connectFuture);
421         sshAuthListener.operationComplete(getSuccessAuthFuture());
422         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
423
424         final ChannelPromise disconnectPromise = getMockedPromise();
425         asyncSshHandler.disconnect(ctx, disconnectPromise);
426
427         verify(sshSession).close(anyBoolean());
428         verify(disconnectPromise).setSuccess();
429         verify(ctx).fireChannelInactive();
430     }
431
432     private static OpenFuture getSuccessOpenFuture() {
433         final OpenFuture failedOpenFuture = mock(OpenFuture.class);
434         doReturn(true).when(failedOpenFuture).isOpened();
435         return failedOpenFuture;
436     }
437
438     private static AuthFuture getSuccessAuthFuture() {
439         final AuthFuture authFuture = mock(AuthFuture.class);
440         doReturn(true).when(authFuture).isSuccess();
441         return authFuture;
442     }
443
444     private static ConnectFuture getSuccessConnectFuture(final ClientSession sshSession) {
445         final ConnectFuture connectFuture = mock(ConnectFuture.class);
446         doReturn(true).when(connectFuture).isConnected();
447
448         doReturn(sshSession).when(connectFuture).getSession();
449         return connectFuture;
450     }
451
452     private static ClientSession getMockedSshSession(final ChannelSubsystem subsystemChannel) throws IOException {
453         final ClientSession sshSession = mock(ClientSession.class);
454
455         doReturn("sshSession").when(sshSession).toString();
456         doReturn("serverVersion").when(sshSession).getServerVersion();
457         doReturn(false).when(sshSession).isClosed();
458         doReturn(false).when(sshSession).isClosing();
459         final CloseFuture closeFuture = mock(CloseFuture.class);
460         Futures.addCallback(stubAddListener(closeFuture), new SuccessFutureListener<CloseFuture>() {
461             @Override
462             public void onSuccess(final SshFutureListener<CloseFuture> result) {
463                 doReturn(true).when(closeFuture).isClosed();
464                 result.operationComplete(closeFuture);
465             }
466         }, MoreExecutors.directExecutor());
467         doReturn(closeFuture).when(sshSession).close(false);
468
469         doReturn(subsystemChannel).when(sshSession).createSubsystemChannel(anyString());
470
471         return sshSession;
472     }
473
474     private ChannelSubsystem getMockedSubsystemChannel(final IoInputStream asyncOut,
475                                                        final IoOutputStream asyncIn) throws IOException {
476         final ChannelSubsystem subsystemChannel = mock(ChannelSubsystem.class);
477         doReturn("subsystemChannel").when(subsystemChannel).toString();
478
479         doNothing().when(subsystemChannel).setStreaming(any(ClientChannel.Streaming.class));
480         final OpenFuture openFuture = mock(OpenFuture.class);
481
482         Futures.addCallback(stubAddListener(openFuture), new SuccessFutureListener<OpenFuture>() {
483             @Override
484             public void onSuccess(final SshFutureListener<OpenFuture> result) {
485                 sshChannelOpenListener = result;
486             }
487         }, MoreExecutors.directExecutor());
488
489         doReturn(asyncOut).when(subsystemChannel).getAsyncOut();
490
491         doReturn(openFuture).when(subsystemChannel).open();
492         doReturn(asyncIn).when(subsystemChannel).getAsyncIn();
493         return subsystemChannel;
494     }
495
496     private static IoOutputStream getMockedIoOutputStream() throws IOException {
497         final IoOutputStream mock = mock(IoOutputStream.class);
498         final IoWriteFuture ioWriteFuture = mock(IoWriteFuture.class);
499         doReturn(ioWriteFuture).when(ioWriteFuture).addListener(any());
500         doReturn(true).when(ioWriteFuture).isWritten();
501
502         Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
503             @Override
504             public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
505                 result.operationComplete(ioWriteFuture);
506             }
507         }, MoreExecutors.directExecutor());
508
509         doReturn(ioWriteFuture).when(mock).writePacket(any(Buffer.class));
510         doReturn(false).when(mock).isClosed();
511         doReturn(false).when(mock).isClosing();
512         return mock;
513     }
514
515     private static IoInputStream getMockedIoInputStream() {
516         final IoInputStream mock = mock(IoInputStream.class);
517         final IoReadFuture ioReadFuture = mock(IoReadFuture.class);
518         doReturn(null).when(ioReadFuture).getException();
519         doReturn(ioReadFuture).when(ioReadFuture).removeListener(any());
520         doReturn(ioReadFuture).when(mock).read(any());
521         doReturn(5).when(ioReadFuture).getRead();
522         doReturn(new ByteArrayBuffer(new byte[]{0, 1, 2, 3, 4})).when(ioReadFuture).getBuffer();
523         doReturn(ioReadFuture).when(ioReadFuture).addListener(any());
524
525         // Always success for read
526         Futures.addCallback(stubAddListener(ioReadFuture), new SuccessFutureListener<IoReadFuture>() {
527             @Override
528             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
529                 result.operationComplete(ioReadFuture);
530             }
531         }, MoreExecutors.directExecutor());
532
533         doReturn(ioReadFuture).when(mock).read(any(Buffer.class));
534         doReturn(false).when(mock).isClosed();
535         doReturn(false).when(mock).isClosing();
536         return mock;
537     }
538
539     @Test
540     public void testConnectFailOpenChannel() throws Exception {
541         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
542
543         final IoInputStream asyncOut = getMockedIoInputStream();
544         final IoOutputStream asyncIn = getMockedIoOutputStream();
545         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
546         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
547         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
548
549         sshConnectListener.operationComplete(connectFuture);
550
551         sshAuthListener.operationComplete(getSuccessAuthFuture());
552
553         verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
554
555         sshChannelOpenListener.operationComplete(getFailedOpenFuture());
556         verify(promise).setFailure(any(Throwable.class));
557     }
558
559     @Test
560     public void testConnectFailAuth() throws Exception {
561         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
562
563         final ClientSession sshSession = mock(ClientSession.class);
564         doReturn(true).when(sshSession).isClosed();
565         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
566
567         sshConnectListener.operationComplete(connectFuture);
568
569         final AuthFuture authFuture = getFailedAuthFuture();
570
571         sshAuthListener.operationComplete(authFuture);
572         verify(promise).setFailure(any(Throwable.class));
573     }
574
575     private static AuthFuture getFailedAuthFuture() {
576         final AuthFuture authFuture = mock(AuthFuture.class);
577         doReturn(false).when(authFuture).isSuccess();
578         doReturn(new IllegalStateException()).when(authFuture).getException();
579         return authFuture;
580     }
581
582     private static OpenFuture getFailedOpenFuture() {
583         final OpenFuture authFuture = mock(OpenFuture.class);
584         doReturn(false).when(authFuture).isOpened();
585         doReturn(new IllegalStateException()).when(authFuture).getException();
586         return authFuture;
587     }
588
589     @Test
590     public void testConnectFail() throws Exception {
591         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
592
593         final ConnectFuture connectFuture = getFailedConnectFuture();
594         sshConnectListener.operationComplete(connectFuture);
595         verify(promise).setFailure(any(Throwable.class));
596     }
597
598     private static ConnectFuture getFailedConnectFuture() {
599         final ConnectFuture connectFuture = mock(ConnectFuture.class);
600         doReturn(false).when(connectFuture).isConnected();
601         doReturn(new IllegalStateException()).when(connectFuture).getException();
602         return connectFuture;
603     }
604
605     private ChannelPromise getMockedPromise() {
606         return spy(new DefaultChannelPromise(channel));
607     }
608
609     private abstract static class SuccessFutureListener<T extends SshFuture<T>>
610             implements FutureCallback<SshFutureListener<T>> {
611
612         @Override
613         public abstract void onSuccess(SshFutureListener<T> result);
614
615         @Override
616         public void onFailure(final Throwable throwable) {
617             throw new RuntimeException(throwable);
618         }
619     }
620 }