Remove netconf from commons/opendaylight pom
[controller.git] / opendaylight / netconf / netconf-netty-util / src / test / java / org / opendaylight / controller / netconf / nettyutil / handler / ssh / client / AsyncSshHandlerTest.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
10
11 import static org.mockito.Matchers.any;
12 import static org.mockito.Matchers.anyBoolean;
13 import static org.mockito.Matchers.anyObject;
14 import static org.mockito.Matchers.anyString;
15 import static org.mockito.Mockito.doAnswer;
16 import static org.mockito.Mockito.doNothing;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.doThrow;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.spy;
21 import static org.mockito.Mockito.times;
22 import static org.mockito.Mockito.verify;
23 import static org.mockito.Mockito.verifyZeroInteractions;
24 import com.google.common.util.concurrent.FutureCallback;
25 import com.google.common.util.concurrent.Futures;
26 import com.google.common.util.concurrent.ListenableFuture;
27 import com.google.common.util.concurrent.SettableFuture;
28 import io.netty.buffer.ByteBuf;
29 import io.netty.buffer.Unpooled;
30 import io.netty.channel.Channel;
31 import io.netty.channel.ChannelFuture;
32 import io.netty.channel.ChannelHandlerContext;
33 import io.netty.channel.ChannelPromise;
34 import io.netty.channel.DefaultChannelPromise;
35 import java.io.IOException;
36 import java.net.SocketAddress;
37 import org.apache.sshd.ClientChannel;
38 import org.apache.sshd.ClientSession;
39 import org.apache.sshd.SshClient;
40 import org.apache.sshd.client.channel.ChannelSubsystem;
41 import org.apache.sshd.client.future.AuthFuture;
42 import org.apache.sshd.client.future.ConnectFuture;
43 import org.apache.sshd.client.future.OpenFuture;
44 import org.apache.sshd.common.future.CloseFuture;
45 import org.apache.sshd.common.future.SshFuture;
46 import org.apache.sshd.common.future.SshFutureListener;
47 import org.apache.sshd.common.io.IoInputStream;
48 import org.apache.sshd.common.io.IoOutputStream;
49 import org.apache.sshd.common.io.IoReadFuture;
50 import org.apache.sshd.common.io.IoWriteFuture;
51 import org.apache.sshd.common.util.Buffer;
52 import org.junit.After;
53 import org.junit.Before;
54 import org.junit.Ignore;
55 import org.junit.Test;
56 import org.mockito.Matchers;
57 import org.mockito.Mock;
58 import org.mockito.MockitoAnnotations;
59 import org.mockito.invocation.InvocationOnMock;
60 import org.mockito.stubbing.Answer;
61 import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
62
63 public class AsyncSshHandlerTest {
64
65     @Mock
66     private SshClient sshClient;
67     @Mock
68     private AuthenticationHandler authHandler;
69     @Mock
70     private ChannelHandlerContext ctx;
71     @Mock
72     private Channel channel;
73     @Mock
74     private SocketAddress remoteAddress;
75     @Mock
76     private SocketAddress localAddress;
77
78     private AsyncSshHandler asyncSshHandler;
79
80     private SshFutureListener<ConnectFuture> sshConnectListener;
81     private SshFutureListener<AuthFuture> sshAuthListener;
82     private SshFutureListener<OpenFuture> sshChannelOpenListener;
83
84     private ChannelPromise promise;
85
86     @Before
87     public void setUp() throws Exception {
88         MockitoAnnotations.initMocks(this);
89         stubAuth();
90         stubSshClient();
91         stubChannel();
92         stubCtx();
93         stubRemoteAddress();
94
95         promise = getMockedPromise();
96
97         asyncSshHandler = new AsyncSshHandler(authHandler, sshClient);
98     }
99
100     @After
101     public void tearDown() throws Exception {
102         sshConnectListener = null;
103         sshAuthListener = null;
104         sshChannelOpenListener = null;
105         promise = null;
106         asyncSshHandler.close(ctx, getMockedPromise());
107     }
108
109     private void stubAuth() throws IOException {
110         doReturn("usr").when(authHandler).getUsername();
111
112         final AuthFuture authFuture = mock(AuthFuture.class);
113         Futures.addCallback(stubAddListener(authFuture), new SuccessFutureListener<AuthFuture>() {
114             @Override
115             public void onSuccess(final SshFutureListener<AuthFuture> result) {
116                 sshAuthListener = result;
117             }
118         });
119         doReturn(authFuture).when(authHandler).authenticate(any(ClientSession.class));
120     }
121
122     @SuppressWarnings("unchecked")
123     private static <T extends SshFuture<T>> ListenableFuture<SshFutureListener<T>> stubAddListener(final T future) {
124         final SettableFuture<SshFutureListener<T>> listenerSettableFuture = SettableFuture.create();
125
126         doAnswer(new Answer<Object>() {
127             @Override
128             public Object answer(final InvocationOnMock invocation) throws Throwable {
129                 listenerSettableFuture.set((SshFutureListener<T>) invocation.getArguments()[0]);
130                 return null;
131             }
132         }).when(future).addListener(any(SshFutureListener.class));
133
134         return listenerSettableFuture;
135     }
136
137     private void stubRemoteAddress() {
138         doReturn("remote").when(remoteAddress).toString();
139     }
140
141     private void stubCtx() {
142         doReturn(channel).when(ctx).channel();
143         doReturn(ctx).when(ctx).fireChannelActive();
144         doReturn(ctx).when(ctx).fireChannelInactive();
145         doReturn(ctx).when(ctx).fireChannelRead(anyObject());
146         doReturn(mock(ChannelFuture.class)).when(ctx).disconnect(any(ChannelPromise.class));
147         doReturn(getMockedPromise()).when(ctx).newPromise();
148     }
149
150     private void stubChannel() {
151         doReturn("channel").when(channel).toString();
152     }
153
154     private void stubSshClient() {
155         doNothing().when(sshClient).start();
156         final ConnectFuture connectFuture = mock(ConnectFuture.class);
157         Futures.addCallback(stubAddListener(connectFuture), new SuccessFutureListener<ConnectFuture>() {
158             @Override
159             public void onSuccess(final SshFutureListener<ConnectFuture> result) {
160                 sshConnectListener = result;
161             }
162         });
163         doReturn(connectFuture).when(sshClient).connect("usr", remoteAddress);
164     }
165
166     @Test
167     public void testConnectSuccess() throws Exception {
168         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
169
170         final IoInputStream asyncOut = getMockedIoInputStream();
171         final IoOutputStream asyncIn = getMockedIoOutputStream();
172         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
173         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
174         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
175
176         sshConnectListener.operationComplete(connectFuture);
177         sshAuthListener.operationComplete(getSuccessAuthFuture());
178         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
179
180         verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
181
182         verify(promise).setSuccess();
183         verify(ctx).fireChannelActive();
184     }
185
186     @Test
187     public void testRead() throws Exception {
188         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
189
190         final IoInputStream asyncOut = getMockedIoInputStream();
191         final IoOutputStream asyncIn = getMockedIoOutputStream();
192         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
193         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
194         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
195
196         sshConnectListener.operationComplete(connectFuture);
197         sshAuthListener.operationComplete(getSuccessAuthFuture());
198         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
199
200         verify(ctx).fireChannelRead(any(ByteBuf.class));
201     }
202
203     @Test
204     public void testReadClosed() throws Exception {
205         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
206
207         final IoInputStream asyncOut = getMockedIoInputStream();
208         final IoReadFuture mockedReadFuture = asyncOut.read(null);
209
210         Futures.addCallback(stubAddListener(mockedReadFuture), new SuccessFutureListener<IoReadFuture>() {
211             @Override
212             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
213                 doReturn(new IllegalStateException()).when(mockedReadFuture).getException();
214                 doReturn(mockedReadFuture).when(mockedReadFuture).removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
215                 doReturn(true).when(asyncOut).isClosing();
216                 doReturn(true).when(asyncOut).isClosed();
217                 result.operationComplete(mockedReadFuture);
218             }
219         });
220
221         final IoOutputStream asyncIn = getMockedIoOutputStream();
222         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
223         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
224         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
225
226         sshConnectListener.operationComplete(connectFuture);
227         sshAuthListener.operationComplete(getSuccessAuthFuture());
228         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
229
230         verify(ctx).fireChannelInactive();
231     }
232
233     @Test
234     public void testReadFail() throws Exception {
235         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
236
237         final IoInputStream asyncOut = getMockedIoInputStream();
238         final IoReadFuture mockedReadFuture = asyncOut.read(null);
239
240         Futures.addCallback(stubAddListener(mockedReadFuture), new SuccessFutureListener<IoReadFuture>() {
241             @Override
242             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
243                 doReturn(new IllegalStateException()).when(mockedReadFuture).getException();
244                 doReturn(mockedReadFuture).when(mockedReadFuture).removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
245                 result.operationComplete(mockedReadFuture);
246             }
247         });
248
249         final IoOutputStream asyncIn = getMockedIoOutputStream();
250         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
251         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
252         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
253
254         sshConnectListener.operationComplete(connectFuture);
255         sshAuthListener.operationComplete(getSuccessAuthFuture());
256         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
257
258         verify(ctx).fireChannelInactive();
259     }
260
261     @Test
262     public void testWrite() throws Exception {
263         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
264
265         final IoInputStream asyncOut = getMockedIoInputStream();
266         final IoOutputStream asyncIn = getMockedIoOutputStream();
267         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
268         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
269         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
270
271         sshConnectListener.operationComplete(connectFuture);
272         sshAuthListener.operationComplete(getSuccessAuthFuture());
273         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
274
275         final ChannelPromise writePromise = getMockedPromise();
276         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), writePromise);
277
278         verify(writePromise).setSuccess();
279     }
280
281     @Test
282     public void testWriteClosed() throws Exception {
283         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
284
285         final IoInputStream asyncOut = getMockedIoInputStream();
286         final IoOutputStream asyncIn = getMockedIoOutputStream();
287
288         final IoWriteFuture ioWriteFuture = asyncIn.write(null);
289
290         Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
291             @Override
292             public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
293                 doReturn(false).when(ioWriteFuture).isWritten();
294                 doReturn(new IllegalStateException()).when(ioWriteFuture).getException();
295                 doReturn(true).when(asyncIn).isClosing();
296                 doReturn(true).when(asyncIn).isClosed();
297                 result.operationComplete(ioWriteFuture);
298             }
299         });
300
301         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
302         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
303         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
304
305         sshConnectListener.operationComplete(connectFuture);
306         sshAuthListener.operationComplete(getSuccessAuthFuture());
307         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
308
309         final ChannelPromise writePromise = getMockedPromise();
310         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), writePromise);
311
312         verify(writePromise).setFailure(any(Throwable.class));
313     }
314
315     @Test
316     public void testWritePendingOne() throws Exception {
317         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
318
319         final IoInputStream asyncOut = getMockedIoInputStream();
320         final IoOutputStream asyncIn = getMockedIoOutputStream();
321         final IoWriteFuture ioWriteFuture = asyncIn.write(null);
322
323         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
324         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
325         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
326
327         sshConnectListener.operationComplete(connectFuture);
328         sshAuthListener.operationComplete(getSuccessAuthFuture());
329         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
330
331         final ChannelPromise firstWritePromise = getMockedPromise();
332
333         // intercept listener for first write, so we can invoke successful write later thus simulate pending of the first write
334         final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture = stubAddListener(ioWriteFuture);
335         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
336         final SshFutureListener<IoWriteFuture> firstWriteListener = firstWriteListenerFuture.get();
337         // intercept second listener, this is the listener for pending write for the pending write to know when pending state ended
338         final ListenableFuture<SshFutureListener<IoWriteFuture>> pendingListener = stubAddListener(ioWriteFuture);
339
340         final ChannelPromise secondWritePromise = getMockedPromise();
341         // now make write throw pending exception
342         doThrow(org.apache.sshd.common.io.WritePendingException.class).when(asyncIn).write(any(Buffer.class));
343         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
344
345         doReturn(ioWriteFuture).when(asyncIn).write(any(Buffer.class));
346
347         verifyZeroInteractions(firstWritePromise, secondWritePromise);
348
349         // make first write stop pending
350         firstWriteListener.operationComplete(ioWriteFuture);
351
352         // notify listener for second write that pending has ended
353         pendingListener.get().operationComplete(ioWriteFuture);
354
355         // verify both write promises successful
356         verify(firstWritePromise).setSuccess();
357         verify(secondWritePromise).setSuccess();
358     }
359
360     @Ignore("Pending queue is not limited")
361     @Test
362     public void testWritePendingMax() throws Exception {
363         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
364
365         final IoInputStream asyncOut = getMockedIoInputStream();
366         final IoOutputStream asyncIn = getMockedIoOutputStream();
367         final IoWriteFuture ioWriteFuture = asyncIn.write(null);
368
369         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
370         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
371         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
372
373         sshConnectListener.operationComplete(connectFuture);
374         sshAuthListener.operationComplete(getSuccessAuthFuture());
375         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
376
377         final ChannelPromise firstWritePromise = getMockedPromise();
378
379         // intercept listener for first write, so we can invoke successful write later thus simulate pending of the first write
380         final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture = stubAddListener(ioWriteFuture);
381         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
382
383         final ChannelPromise secondWritePromise = getMockedPromise();
384         // now make write throw pending exception
385         doThrow(org.apache.sshd.common.io.WritePendingException.class).when(asyncIn).write(any(Buffer.class));
386         for (int i = 0; i < 1001; i++) {
387             asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
388         }
389
390         verify(secondWritePromise, times(1)).setFailure(any(Throwable.class));
391     }
392
393     @Test
394     public void testDisconnect() throws Exception {
395         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
396
397         final IoInputStream asyncOut = getMockedIoInputStream();
398         final IoOutputStream asyncIn = getMockedIoOutputStream();
399         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
400         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
401         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
402
403         sshConnectListener.operationComplete(connectFuture);
404         sshAuthListener.operationComplete(getSuccessAuthFuture());
405         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
406
407         final ChannelPromise disconnectPromise = getMockedPromise();
408         asyncSshHandler.disconnect(ctx, disconnectPromise);
409
410         verify(sshSession).close(anyBoolean());
411         verify(disconnectPromise).setSuccess();
412         verify(ctx).fireChannelInactive();
413     }
414
415     private static OpenFuture getSuccessOpenFuture() {
416         final OpenFuture failedOpenFuture = mock(OpenFuture.class);
417         doReturn(true).when(failedOpenFuture).isOpened();
418         return failedOpenFuture;
419     }
420
421     private static AuthFuture getSuccessAuthFuture() {
422         final AuthFuture authFuture = mock(AuthFuture.class);
423         doReturn(true).when(authFuture).isSuccess();
424         return authFuture;
425     }
426
427     private static ConnectFuture getSuccessConnectFuture(final ClientSession sshSession) {
428         final ConnectFuture connectFuture = mock(ConnectFuture.class);
429         doReturn(true).when(connectFuture).isConnected();
430
431         doReturn(sshSession).when(connectFuture).getSession();
432         return connectFuture;
433     }
434
435     private static ClientSession getMockedSshSession(final ChannelSubsystem subsystemChannel) throws IOException {
436         final ClientSession sshSession = mock(ClientSession.class);
437
438         doReturn("sshSession").when(sshSession).toString();
439         doReturn("serverVersion").when(sshSession).getServerVersion();
440         doReturn(false).when(sshSession).isClosed();
441         doReturn(false).when(sshSession).isClosing();
442         final CloseFuture closeFuture = mock(CloseFuture.class);
443         Futures.addCallback(stubAddListener(closeFuture), new SuccessFutureListener<CloseFuture>() {
444             @Override
445             public void onSuccess(final SshFutureListener<CloseFuture> result) {
446                 doReturn(true).when(closeFuture).isClosed();
447                 result.operationComplete(closeFuture);
448             }
449         });
450         doReturn(closeFuture).when(sshSession).close(false);
451
452         doReturn(subsystemChannel).when(sshSession).createSubsystemChannel(anyString());
453
454         return sshSession;
455     }
456
457     private ChannelSubsystem getMockedSubsystemChannel(final IoInputStream asyncOut, final IoOutputStream asyncIn) throws IOException {
458         final ChannelSubsystem subsystemChannel = mock(ChannelSubsystem.class);
459         doReturn("subsystemChannel").when(subsystemChannel).toString();
460
461         doNothing().when(subsystemChannel).setStreaming(any(ClientChannel.Streaming.class));
462         final OpenFuture openFuture = mock(OpenFuture.class);
463
464         Futures.addCallback(stubAddListener(openFuture), new SuccessFutureListener<OpenFuture>() {
465             @Override
466             public void onSuccess(final SshFutureListener<OpenFuture> result) {
467                 sshChannelOpenListener = result;
468             }
469         });
470
471         doReturn(asyncOut).when(subsystemChannel).getAsyncOut();
472
473         doReturn(openFuture).when(subsystemChannel).open();
474         doReturn(asyncIn).when(subsystemChannel).getAsyncIn();
475         return subsystemChannel;
476     }
477
478     private static IoOutputStream getMockedIoOutputStream() {
479         final IoOutputStream mock = mock(IoOutputStream.class);
480         final IoWriteFuture ioWriteFuture = mock(IoWriteFuture.class);
481         doReturn(ioWriteFuture).when(ioWriteFuture).addListener(Matchers.<SshFutureListener<IoWriteFuture>>any());
482         doReturn(true).when(ioWriteFuture).isWritten();
483
484         Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
485             @Override
486             public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
487                 result.operationComplete(ioWriteFuture);
488             }
489         });
490
491         doReturn(ioWriteFuture).when(mock).write(any(Buffer.class));
492         doReturn(false).when(mock).isClosed();
493         doReturn(false).when(mock).isClosing();
494         return mock;
495     }
496
497     private static IoInputStream getMockedIoInputStream() {
498         final IoInputStream mock = mock(IoInputStream.class);
499         final IoReadFuture ioReadFuture = mock(IoReadFuture.class);
500         doReturn(null).when(ioReadFuture).getException();
501         doReturn(ioReadFuture).when(ioReadFuture).removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
502         doReturn(5).when(ioReadFuture).getRead();
503         doReturn(new Buffer(new byte[]{0, 1, 2, 3, 4})).when(ioReadFuture).getBuffer();
504         doReturn(ioReadFuture).when(ioReadFuture).addListener(Matchers.<SshFutureListener<IoReadFuture>>any());
505
506         // Always success for read
507         Futures.addCallback(stubAddListener(ioReadFuture), new SuccessFutureListener<IoReadFuture>() {
508             @Override
509             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
510                 result.operationComplete(ioReadFuture);
511             }
512         });
513
514         doReturn(ioReadFuture).when(mock).read(any(Buffer.class));
515         doReturn(false).when(mock).isClosed();
516         doReturn(false).when(mock).isClosing();
517         return mock;
518     }
519
520     @Test
521     public void testConnectFailOpenChannel() throws Exception {
522         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
523
524         final IoInputStream asyncOut = getMockedIoInputStream();
525         final IoOutputStream asyncIn = getMockedIoOutputStream();
526         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
527         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
528         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
529
530         sshConnectListener.operationComplete(connectFuture);
531
532         sshAuthListener.operationComplete(getSuccessAuthFuture());
533
534         verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
535
536         sshChannelOpenListener.operationComplete(getFailedOpenFuture());
537         verify(promise).setFailure(any(Throwable.class));
538     }
539
540     @Test
541     public void testConnectFailAuth() throws Exception {
542         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
543
544         final ClientSession sshSession = mock(ClientSession.class);
545         doReturn(true).when(sshSession).isClosed();
546         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
547
548         sshConnectListener.operationComplete(connectFuture);
549
550         final AuthFuture authFuture = getFailedAuthFuture();
551
552         sshAuthListener.operationComplete(authFuture);
553         verify(promise).setFailure(any(Throwable.class));
554     }
555
556     private static AuthFuture getFailedAuthFuture() {
557         final AuthFuture authFuture = mock(AuthFuture.class);
558         doReturn(false).when(authFuture).isSuccess();
559         doReturn(new IllegalStateException()).when(authFuture).getException();
560         return authFuture;
561     }
562
563     private static OpenFuture getFailedOpenFuture() {
564         final OpenFuture authFuture = mock(OpenFuture.class);
565         doReturn(false).when(authFuture).isOpened();
566         doReturn(new IllegalStateException()).when(authFuture).getException();
567         return authFuture;
568     }
569
570     @Test
571     public void testConnectFail() throws Exception {
572         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
573
574         final ConnectFuture connectFuture = getFailedConnectFuture();
575         sshConnectListener.operationComplete(connectFuture);
576         verify(promise).setFailure(any(Throwable.class));
577     }
578
579     private static ConnectFuture getFailedConnectFuture() {
580         final ConnectFuture connectFuture = mock(ConnectFuture.class);
581         doReturn(false).when(connectFuture).isConnected();
582         doReturn(new IllegalStateException()).when(connectFuture).getException();
583         return connectFuture;
584     }
585
586     private ChannelPromise getMockedPromise() {
587         return spy(new DefaultChannelPromise(channel));
588     }
589
590     private static abstract class SuccessFutureListener<T extends SshFuture<T>> implements FutureCallback<SshFutureListener<T>> {
591
592         @Override
593         public abstract void onSuccess(final SshFutureListener<T> result);
594
595         @Override
596         public void onFailure(final Throwable t) {
597             throw new RuntimeException(t);
598         }
599     }
600 }