Add 'protocol-framework/' from commit '5d015c2c5f800d136406c15fcb64fd531d4ffc26'
[netconf.git] / netconf / netconf-netty-util / src / test / java / org / opendaylight / netconf / nettyutil / handler / ssh / client / AsyncSshHandlerTest.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.nettyutil.handler.ssh.client;
10
11 import static org.mockito.Matchers.any;
12 import static org.mockito.Matchers.anyBoolean;
13 import static org.mockito.Matchers.anyObject;
14 import static org.mockito.Matchers.anyString;
15 import static org.mockito.Mockito.doAnswer;
16 import static org.mockito.Mockito.doNothing;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.doThrow;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.spy;
21 import static org.mockito.Mockito.times;
22 import static org.mockito.Mockito.verify;
23 import static org.mockito.Mockito.verifyZeroInteractions;
24
25 import com.google.common.util.concurrent.FutureCallback;
26 import com.google.common.util.concurrent.Futures;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.SettableFuture;
29 import io.netty.buffer.ByteBuf;
30 import io.netty.buffer.Unpooled;
31 import io.netty.channel.Channel;
32 import io.netty.channel.ChannelFuture;
33 import io.netty.channel.ChannelHandlerContext;
34 import io.netty.channel.ChannelPromise;
35 import io.netty.channel.DefaultChannelPromise;
36 import io.netty.channel.EventLoop;
37 import java.io.IOException;
38 import java.net.SocketAddress;
39 import org.apache.sshd.ClientChannel;
40 import org.apache.sshd.ClientSession;
41 import org.apache.sshd.SshClient;
42 import org.apache.sshd.client.channel.ChannelSubsystem;
43 import org.apache.sshd.client.future.AuthFuture;
44 import org.apache.sshd.client.future.ConnectFuture;
45 import org.apache.sshd.client.future.OpenFuture;
46 import org.apache.sshd.common.future.CloseFuture;
47 import org.apache.sshd.common.future.SshFuture;
48 import org.apache.sshd.common.future.SshFutureListener;
49 import org.apache.sshd.common.io.IoInputStream;
50 import org.apache.sshd.common.io.IoOutputStream;
51 import org.apache.sshd.common.io.IoReadFuture;
52 import org.apache.sshd.common.io.IoWriteFuture;
53 import org.apache.sshd.common.util.Buffer;
54 import org.junit.After;
55 import org.junit.Before;
56 import org.junit.Ignore;
57 import org.junit.Test;
58 import org.mockito.Matchers;
59 import org.mockito.Mock;
60 import org.mockito.MockitoAnnotations;
61 import org.mockito.invocation.InvocationOnMock;
62 import org.mockito.stubbing.Answer;
63 import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
64
65 public class AsyncSshHandlerTest {
66
67     @Mock
68     private SshClient sshClient;
69     @Mock
70     private AuthenticationHandler authHandler;
71     @Mock
72     private ChannelHandlerContext ctx;
73     @Mock
74     private Channel channel;
75     @Mock
76     private SocketAddress remoteAddress;
77     @Mock
78     private SocketAddress localAddress;
79     @Mock
80     private EventLoop eventLoop;
81
82     private AsyncSshHandler asyncSshHandler;
83
84     private SshFutureListener<ConnectFuture> sshConnectListener;
85     private SshFutureListener<AuthFuture> sshAuthListener;
86     private SshFutureListener<OpenFuture> sshChannelOpenListener;
87
88     private ChannelPromise promise;
89
90     @Before
91     public void setUp() throws Exception {
92         MockitoAnnotations.initMocks(this);
93         stubAuth();
94         stubSshClient();
95         stubChannel();
96         stubEventLoop();
97         stubCtx();
98         stubRemoteAddress();
99
100         promise = getMockedPromise();
101
102         asyncSshHandler = new AsyncSshHandler(authHandler, sshClient);
103     }
104
105     @After
106     public void tearDown() throws Exception {
107         sshConnectListener = null;
108         sshAuthListener = null;
109         sshChannelOpenListener = null;
110         promise = null;
111         asyncSshHandler.close(ctx, getMockedPromise());
112     }
113
114     private void stubAuth() throws IOException {
115         doReturn("usr").when(authHandler).getUsername();
116
117         final AuthFuture authFuture = mock(AuthFuture.class);
118         Futures.addCallback(stubAddListener(authFuture), new SuccessFutureListener<AuthFuture>() {
119             @Override
120             public void onSuccess(final SshFutureListener<AuthFuture> result) {
121                 sshAuthListener = result;
122             }
123         });
124         doReturn(authFuture).when(authHandler).authenticate(any(ClientSession.class));
125     }
126
127     @SuppressWarnings("unchecked")
128     private static <T extends SshFuture<T>> ListenableFuture<SshFutureListener<T>> stubAddListener(final T future) {
129         final SettableFuture<SshFutureListener<T>> listenerSettableFuture = SettableFuture.create();
130
131         doAnswer(new Answer<Object>() {
132             @Override
133             public Object answer(final InvocationOnMock invocation) throws Throwable {
134                 listenerSettableFuture.set((SshFutureListener<T>) invocation.getArguments()[0]);
135                 return null;
136             }
137         }).when(future).addListener(any(SshFutureListener.class));
138
139         return listenerSettableFuture;
140     }
141
142     private void stubRemoteAddress() {
143         doReturn("remote").when(remoteAddress).toString();
144     }
145
146     private void stubCtx() {
147         doReturn(channel).when(ctx).channel();
148         doReturn(ctx).when(ctx).fireChannelActive();
149         doReturn(ctx).when(ctx).fireChannelInactive();
150         doReturn(ctx).when(ctx).fireChannelRead(anyObject());
151         doReturn(mock(ChannelFuture.class)).when(ctx).disconnect(any(ChannelPromise.class));
152         doReturn(getMockedPromise()).when(ctx).newPromise();
153     }
154
155     private void stubChannel() {
156         doReturn("channel").when(channel).toString();
157     }
158
159     private void stubEventLoop() {
160         doReturn(eventLoop).when(channel).eventLoop();
161         doReturn(Boolean.TRUE).when(eventLoop).inEventLoop();
162     }
163
164     private void stubSshClient() {
165         doNothing().when(sshClient).start();
166         final ConnectFuture connectFuture = mock(ConnectFuture.class);
167         Futures.addCallback(stubAddListener(connectFuture), new SuccessFutureListener<ConnectFuture>() {
168             @Override
169             public void onSuccess(final SshFutureListener<ConnectFuture> result) {
170                 sshConnectListener = result;
171             }
172         });
173         doReturn(connectFuture).when(sshClient).connect("usr", remoteAddress);
174     }
175
176     @Test
177     public void testConnectSuccess() throws Exception {
178         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
179
180         final IoInputStream asyncOut = getMockedIoInputStream();
181         final IoOutputStream asyncIn = getMockedIoOutputStream();
182         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
183         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
184         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
185
186         sshConnectListener.operationComplete(connectFuture);
187         sshAuthListener.operationComplete(getSuccessAuthFuture());
188         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
189
190         verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
191
192         verify(promise).setSuccess();
193         verify(ctx).fireChannelActive();
194     }
195
196     @Test
197     public void testRead() throws Exception {
198         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
199
200         final IoInputStream asyncOut = getMockedIoInputStream();
201         final IoOutputStream asyncIn = getMockedIoOutputStream();
202         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
203         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
204         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
205
206         sshConnectListener.operationComplete(connectFuture);
207         sshAuthListener.operationComplete(getSuccessAuthFuture());
208         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
209
210         verify(ctx).fireChannelRead(any(ByteBuf.class));
211     }
212
213     @Test
214     public void testReadClosed() throws Exception {
215         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
216
217         final IoInputStream asyncOut = getMockedIoInputStream();
218         final IoReadFuture mockedReadFuture = asyncOut.read(null);
219
220         Futures.addCallback(stubAddListener(mockedReadFuture), new SuccessFutureListener<IoReadFuture>() {
221             @Override
222             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
223                 doReturn(new IllegalStateException()).when(mockedReadFuture).getException();
224                 doReturn(mockedReadFuture).when(mockedReadFuture)
225                         .removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
226                 doReturn(true).when(asyncOut).isClosing();
227                 doReturn(true).when(asyncOut).isClosed();
228                 result.operationComplete(mockedReadFuture);
229             }
230         });
231
232         final IoOutputStream asyncIn = getMockedIoOutputStream();
233         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
234         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
235         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
236
237         sshConnectListener.operationComplete(connectFuture);
238         sshAuthListener.operationComplete(getSuccessAuthFuture());
239         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
240
241         verify(ctx).fireChannelInactive();
242     }
243
244     @Test
245     public void testReadFail() throws Exception {
246         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
247
248         final IoInputStream asyncOut = getMockedIoInputStream();
249         final IoReadFuture mockedReadFuture = asyncOut.read(null);
250
251         Futures.addCallback(stubAddListener(mockedReadFuture), new SuccessFutureListener<IoReadFuture>() {
252             @Override
253             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
254                 doReturn(new IllegalStateException()).when(mockedReadFuture).getException();
255                 doReturn(mockedReadFuture).when(mockedReadFuture)
256                         .removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
257                 result.operationComplete(mockedReadFuture);
258             }
259         });
260
261         final IoOutputStream asyncIn = getMockedIoOutputStream();
262         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
263         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
264         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
265
266         sshConnectListener.operationComplete(connectFuture);
267         sshAuthListener.operationComplete(getSuccessAuthFuture());
268         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
269
270         verify(ctx).fireChannelInactive();
271     }
272
273     @Test
274     public void testWrite() throws Exception {
275         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
276
277         final IoInputStream asyncOut = getMockedIoInputStream();
278         final IoOutputStream asyncIn = getMockedIoOutputStream();
279         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
280         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
281         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
282
283         sshConnectListener.operationComplete(connectFuture);
284         sshAuthListener.operationComplete(getSuccessAuthFuture());
285         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
286
287         final ChannelPromise writePromise = getMockedPromise();
288         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), writePromise);
289
290         verify(writePromise).setSuccess();
291     }
292
293     @Test
294     public void testWriteClosed() throws Exception {
295         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
296
297         final IoInputStream asyncOut = getMockedIoInputStream();
298         final IoOutputStream asyncIn = getMockedIoOutputStream();
299
300         final IoWriteFuture ioWriteFuture = asyncIn.write(null);
301
302         Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
303             @Override
304             public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
305                 doReturn(false).when(ioWriteFuture).isWritten();
306                 doReturn(new IllegalStateException()).when(ioWriteFuture).getException();
307                 doReturn(true).when(asyncIn).isClosing();
308                 doReturn(true).when(asyncIn).isClosed();
309                 result.operationComplete(ioWriteFuture);
310             }
311         });
312
313         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
314         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
315         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
316
317         sshConnectListener.operationComplete(connectFuture);
318         sshAuthListener.operationComplete(getSuccessAuthFuture());
319         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
320
321         final ChannelPromise writePromise = getMockedPromise();
322         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), writePromise);
323
324         verify(writePromise).setFailure(any(Throwable.class));
325     }
326
327     @Test
328     public void testWritePendingOne() throws Exception {
329         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
330
331         final IoInputStream asyncOut = getMockedIoInputStream();
332         final IoOutputStream asyncIn = getMockedIoOutputStream();
333         final IoWriteFuture ioWriteFuture = asyncIn.write(null);
334
335         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
336         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
337         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
338
339         sshConnectListener.operationComplete(connectFuture);
340         sshAuthListener.operationComplete(getSuccessAuthFuture());
341         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
342
343         final ChannelPromise firstWritePromise = getMockedPromise();
344
345         // intercept listener for first write,
346         // so we can invoke successful write later thus simulate pending of the first write
347         final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture =
348                 stubAddListener(ioWriteFuture);
349         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
350         final SshFutureListener<IoWriteFuture> firstWriteListener = firstWriteListenerFuture.get();
351         // intercept second listener,
352         // this is the listener for pending write for the pending write to know when pending state ended
353         final ListenableFuture<SshFutureListener<IoWriteFuture>> pendingListener = stubAddListener(ioWriteFuture);
354
355         final ChannelPromise secondWritePromise = getMockedPromise();
356         // now make write throw pending exception
357         doThrow(org.apache.sshd.common.io.WritePendingException.class).when(asyncIn).write(any(Buffer.class));
358         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
359
360         doReturn(ioWriteFuture).when(asyncIn).write(any(Buffer.class));
361
362         verifyZeroInteractions(firstWritePromise, secondWritePromise);
363
364         // make first write stop pending
365         firstWriteListener.operationComplete(ioWriteFuture);
366
367         // notify listener for second write that pending has ended
368         pendingListener.get().operationComplete(ioWriteFuture);
369
370         // verify both write promises successful
371         verify(firstWritePromise).setSuccess();
372         verify(secondWritePromise).setSuccess();
373     }
374
375     @Ignore("Pending queue is not limited")
376     @Test
377     public void testWritePendingMax() throws Exception {
378         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
379
380         final IoInputStream asyncOut = getMockedIoInputStream();
381         final IoOutputStream asyncIn = getMockedIoOutputStream();
382         final IoWriteFuture ioWriteFuture = asyncIn.write(null);
383
384         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
385         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
386         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
387
388         sshConnectListener.operationComplete(connectFuture);
389         sshAuthListener.operationComplete(getSuccessAuthFuture());
390         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
391
392         final ChannelPromise firstWritePromise = getMockedPromise();
393
394         // intercept listener for first write,
395         // so we can invoke successful write later thus simulate pending of the first write
396         final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture =
397                 stubAddListener(ioWriteFuture);
398         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
399
400         final ChannelPromise secondWritePromise = getMockedPromise();
401         // now make write throw pending exception
402         doThrow(org.apache.sshd.common.io.WritePendingException.class).when(asyncIn).write(any(Buffer.class));
403         for (int i = 0; i < 1001; i++) {
404             asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
405         }
406
407         verify(secondWritePromise, times(1)).setFailure(any(Throwable.class));
408     }
409
410     @Test
411     public void testDisconnect() throws Exception {
412         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
413
414         final IoInputStream asyncOut = getMockedIoInputStream();
415         final IoOutputStream asyncIn = getMockedIoOutputStream();
416         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
417         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
418         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
419
420         sshConnectListener.operationComplete(connectFuture);
421         sshAuthListener.operationComplete(getSuccessAuthFuture());
422         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
423
424         final ChannelPromise disconnectPromise = getMockedPromise();
425         asyncSshHandler.disconnect(ctx, disconnectPromise);
426
427         verify(sshSession).close(anyBoolean());
428         verify(disconnectPromise).setSuccess();
429         verify(ctx).fireChannelInactive();
430     }
431
432     private static OpenFuture getSuccessOpenFuture() {
433         final OpenFuture failedOpenFuture = mock(OpenFuture.class);
434         doReturn(true).when(failedOpenFuture).isOpened();
435         return failedOpenFuture;
436     }
437
438     private static AuthFuture getSuccessAuthFuture() {
439         final AuthFuture authFuture = mock(AuthFuture.class);
440         doReturn(true).when(authFuture).isSuccess();
441         return authFuture;
442     }
443
444     private static ConnectFuture getSuccessConnectFuture(final ClientSession sshSession) {
445         final ConnectFuture connectFuture = mock(ConnectFuture.class);
446         doReturn(true).when(connectFuture).isConnected();
447
448         doReturn(sshSession).when(connectFuture).getSession();
449         return connectFuture;
450     }
451
452     private static ClientSession getMockedSshSession(final ChannelSubsystem subsystemChannel) throws IOException {
453         final ClientSession sshSession = mock(ClientSession.class);
454
455         doReturn("sshSession").when(sshSession).toString();
456         doReturn("serverVersion").when(sshSession).getServerVersion();
457         doReturn(false).when(sshSession).isClosed();
458         doReturn(false).when(sshSession).isClosing();
459         final CloseFuture closeFuture = mock(CloseFuture.class);
460         Futures.addCallback(stubAddListener(closeFuture), new SuccessFutureListener<CloseFuture>() {
461             @Override
462             public void onSuccess(final SshFutureListener<CloseFuture> result) {
463                 doReturn(true).when(closeFuture).isClosed();
464                 result.operationComplete(closeFuture);
465             }
466         });
467         doReturn(closeFuture).when(sshSession).close(false);
468
469         doReturn(subsystemChannel).when(sshSession).createSubsystemChannel(anyString());
470
471         return sshSession;
472     }
473
474     private ChannelSubsystem getMockedSubsystemChannel(final IoInputStream asyncOut,
475                                                        final IoOutputStream asyncIn) throws IOException {
476         final ChannelSubsystem subsystemChannel = mock(ChannelSubsystem.class);
477         doReturn("subsystemChannel").when(subsystemChannel).toString();
478
479         doNothing().when(subsystemChannel).setStreaming(any(ClientChannel.Streaming.class));
480         final OpenFuture openFuture = mock(OpenFuture.class);
481
482         Futures.addCallback(stubAddListener(openFuture), new SuccessFutureListener<OpenFuture>() {
483             @Override
484             public void onSuccess(final SshFutureListener<OpenFuture> result) {
485                 sshChannelOpenListener = result;
486             }
487         });
488
489         doReturn(asyncOut).when(subsystemChannel).getAsyncOut();
490
491         doReturn(openFuture).when(subsystemChannel).open();
492         doReturn(asyncIn).when(subsystemChannel).getAsyncIn();
493         return subsystemChannel;
494     }
495
496     private static IoOutputStream getMockedIoOutputStream() {
497         final IoOutputStream mock = mock(IoOutputStream.class);
498         final IoWriteFuture ioWriteFuture = mock(IoWriteFuture.class);
499         doReturn(ioWriteFuture).when(ioWriteFuture).addListener(Matchers.<SshFutureListener<IoWriteFuture>>any());
500         doReturn(true).when(ioWriteFuture).isWritten();
501
502         Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
503             @Override
504             public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
505                 result.operationComplete(ioWriteFuture);
506             }
507         });
508
509         doReturn(ioWriteFuture).when(mock).write(any(Buffer.class));
510         doReturn(false).when(mock).isClosed();
511         doReturn(false).when(mock).isClosing();
512         return mock;
513     }
514
515     private static IoInputStream getMockedIoInputStream() {
516         final IoInputStream mock = mock(IoInputStream.class);
517         final IoReadFuture ioReadFuture = mock(IoReadFuture.class);
518         doReturn(null).when(ioReadFuture).getException();
519         doReturn(ioReadFuture).when(ioReadFuture).removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
520         doReturn(5).when(ioReadFuture).getRead();
521         doReturn(new Buffer(new byte[]{0, 1, 2, 3, 4})).when(ioReadFuture).getBuffer();
522         doReturn(ioReadFuture).when(ioReadFuture).addListener(Matchers.<SshFutureListener<IoReadFuture>>any());
523
524         // Always success for read
525         Futures.addCallback(stubAddListener(ioReadFuture), new SuccessFutureListener<IoReadFuture>() {
526             @Override
527             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
528                 result.operationComplete(ioReadFuture);
529             }
530         });
531
532         doReturn(ioReadFuture).when(mock).read(any(Buffer.class));
533         doReturn(false).when(mock).isClosed();
534         doReturn(false).when(mock).isClosing();
535         return mock;
536     }
537
538     @Test
539     public void testConnectFailOpenChannel() throws Exception {
540         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
541
542         final IoInputStream asyncOut = getMockedIoInputStream();
543         final IoOutputStream asyncIn = getMockedIoOutputStream();
544         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
545         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
546         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
547
548         sshConnectListener.operationComplete(connectFuture);
549
550         sshAuthListener.operationComplete(getSuccessAuthFuture());
551
552         verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
553
554         sshChannelOpenListener.operationComplete(getFailedOpenFuture());
555         verify(promise).setFailure(any(Throwable.class));
556     }
557
558     @Test
559     public void testConnectFailAuth() throws Exception {
560         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
561
562         final ClientSession sshSession = mock(ClientSession.class);
563         doReturn(true).when(sshSession).isClosed();
564         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
565
566         sshConnectListener.operationComplete(connectFuture);
567
568         final AuthFuture authFuture = getFailedAuthFuture();
569
570         sshAuthListener.operationComplete(authFuture);
571         verify(promise).setFailure(any(Throwable.class));
572     }
573
574     private static AuthFuture getFailedAuthFuture() {
575         final AuthFuture authFuture = mock(AuthFuture.class);
576         doReturn(false).when(authFuture).isSuccess();
577         doReturn(new IllegalStateException()).when(authFuture).getException();
578         return authFuture;
579     }
580
581     private static OpenFuture getFailedOpenFuture() {
582         final OpenFuture authFuture = mock(OpenFuture.class);
583         doReturn(false).when(authFuture).isOpened();
584         doReturn(new IllegalStateException()).when(authFuture).getException();
585         return authFuture;
586     }
587
588     @Test
589     public void testConnectFail() throws Exception {
590         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
591
592         final ConnectFuture connectFuture = getFailedConnectFuture();
593         sshConnectListener.operationComplete(connectFuture);
594         verify(promise).setFailure(any(Throwable.class));
595     }
596
597     private static ConnectFuture getFailedConnectFuture() {
598         final ConnectFuture connectFuture = mock(ConnectFuture.class);
599         doReturn(false).when(connectFuture).isConnected();
600         doReturn(new IllegalStateException()).when(connectFuture).getException();
601         return connectFuture;
602     }
603
604     private ChannelPromise getMockedPromise() {
605         return spy(new DefaultChannelPromise(channel));
606     }
607
608     private abstract static class SuccessFutureListener<T extends SshFuture<T>>
609             implements FutureCallback<SshFutureListener<T>> {
610
611         @Override
612         public abstract void onSuccess(SshFutureListener<T> result);
613
614         @Override
615         public void onFailure(final Throwable throwable) {
616             throw new RuntimeException(throwable);
617         }
618     }
619 }