d0fc43d04aa2ce656f7616072522437009e92342
[controller.git] / opendaylight / netconf / netconf-netty-util / src / test / java / org / opendaylight / controller / netconf / nettyutil / handler / ssh / client / AsyncSshHandlerTest.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
10
11 import static org.junit.Assert.fail;
12 import static org.mockito.Matchers.any;
13 import static org.mockito.Matchers.anyBoolean;
14 import static org.mockito.Matchers.anyObject;
15 import static org.mockito.Matchers.anyString;
16 import static org.mockito.Mockito.doAnswer;
17 import static org.mockito.Mockito.doNothing;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.doThrow;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.times;
22 import static org.mockito.Mockito.verify;
23 import static org.mockito.Mockito.verifyNoMoreInteractions;
24 import static org.mockito.Mockito.verifyZeroInteractions;
25
26 import java.io.IOException;
27 import java.net.SocketAddress;
28
29 import org.apache.sshd.ClientChannel;
30 import org.apache.sshd.ClientSession;
31 import org.apache.sshd.SshClient;
32 import org.apache.sshd.client.channel.ChannelSubsystem;
33 import org.apache.sshd.client.future.AuthFuture;
34 import org.apache.sshd.client.future.ConnectFuture;
35 import org.apache.sshd.client.future.OpenFuture;
36 import org.apache.sshd.common.future.CloseFuture;
37 import org.apache.sshd.common.future.SshFuture;
38 import org.apache.sshd.common.future.SshFutureListener;
39 import org.apache.sshd.common.io.IoInputStream;
40 import org.apache.sshd.common.io.IoOutputStream;
41 import org.apache.sshd.common.io.IoReadFuture;
42 import org.apache.sshd.common.io.IoWriteFuture;
43 import org.apache.sshd.common.util.Buffer;
44 import org.junit.After;
45 import org.junit.Before;
46 import org.junit.Ignore;
47 import org.junit.Test;
48 import org.mockito.Matchers;
49 import org.mockito.Mock;
50 import org.mockito.MockitoAnnotations;
51 import org.mockito.invocation.InvocationOnMock;
52 import org.mockito.stubbing.Answer;
53 import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
54
55 import com.google.common.util.concurrent.FutureCallback;
56 import com.google.common.util.concurrent.Futures;
57 import com.google.common.util.concurrent.ListenableFuture;
58 import com.google.common.util.concurrent.SettableFuture;
59
60 import io.netty.buffer.ByteBuf;
61 import io.netty.buffer.Unpooled;
62 import io.netty.channel.Channel;
63 import io.netty.channel.ChannelHandlerContext;
64 import io.netty.channel.ChannelPromise;
65
66 public class AsyncSshHandlerTest {
67
68     @Mock
69     private SshClient sshClient;
70     @Mock
71     private AuthenticationHandler authHandler;
72     @Mock
73     private ChannelHandlerContext ctx;
74     @Mock
75     private Channel channel;
76     @Mock
77     private SocketAddress remoteAddress;
78     @Mock
79     private SocketAddress localAddress;
80
81     private AsyncSshHandler asyncSshHandler;
82
83     private SshFutureListener<ConnectFuture> sshConnectListener;
84     private SshFutureListener<AuthFuture> sshAuthListener;
85     private SshFutureListener<OpenFuture> sshChannelOpenListener;
86
87     private ChannelPromise promise;
88
89     @Before
90     public void setUp() throws Exception {
91         MockitoAnnotations.initMocks(this);
92         stubAuth();
93         stubSshClient();
94         stubChannel();
95         stubCtx();
96         stubRemoteAddress();
97
98         promise = getMockedPromise();
99
100         asyncSshHandler = new AsyncSshHandler(authHandler, sshClient);
101     }
102
103     @After
104     public void tearDown() throws Exception {
105         sshConnectListener = null;
106         sshAuthListener = null;
107         sshChannelOpenListener = null;
108         promise = null;
109         asyncSshHandler.close(ctx, getMockedPromise());
110     }
111
112     private void stubAuth() throws IOException {
113         doReturn("usr").when(authHandler).getUsername();
114
115         final AuthFuture authFuture = mock(AuthFuture.class);
116         Futures.addCallback(stubAddListener(authFuture), new SuccessFutureListener<AuthFuture>() {
117             @Override
118             public void onSuccess(final SshFutureListener<AuthFuture> result) {
119                 sshAuthListener = result;
120             }
121         });
122         doReturn(authFuture).when(authHandler).authenticate(any(ClientSession.class));
123     }
124
125     @SuppressWarnings("unchecked")
126     private <T extends SshFuture<T>> ListenableFuture<SshFutureListener<T>> stubAddListener(final T future) {
127         final SettableFuture<SshFutureListener<T>> listenerSettableFuture = SettableFuture.create();
128
129         doAnswer(new Answer() {
130             @Override
131             public Object answer(final InvocationOnMock invocation) throws Throwable {
132                 listenerSettableFuture.set((SshFutureListener<T>) invocation.getArguments()[0]);
133                 return null;
134             }
135         }).when(future).addListener(any(SshFutureListener.class));
136
137         return listenerSettableFuture;
138     }
139
140     private void stubRemoteAddress() {
141         doReturn("remote").when(remoteAddress).toString();
142     }
143
144     private void stubCtx() {
145         doReturn(channel).when(ctx).channel();
146         doReturn(ctx).when(ctx).fireChannelActive();
147         doReturn(ctx).when(ctx).fireChannelInactive();
148         doReturn(ctx).when(ctx).fireChannelRead(anyObject());
149         doReturn(getMockedPromise()).when(ctx).newPromise();
150     }
151
152     private void stubChannel() {
153         doReturn("channel").when(channel).toString();
154     }
155
156     private void stubSshClient() {
157         doNothing().when(sshClient).start();
158         final ConnectFuture connectFuture = mock(ConnectFuture.class);
159         Futures.addCallback(stubAddListener(connectFuture), new SuccessFutureListener<ConnectFuture>() {
160             @Override
161             public void onSuccess(final SshFutureListener<ConnectFuture> result) {
162                 sshConnectListener = result;
163             }
164         });
165         doReturn(connectFuture).when(sshClient).connect("usr", remoteAddress);
166     }
167
168     @Test
169     public void testConnectSuccess() throws Exception {
170         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
171
172         final IoInputStream asyncOut = getMockedIoInputStream();
173         final IoOutputStream asyncIn = getMockedIoOutputStream();
174         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
175         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
176         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
177
178         sshConnectListener.operationComplete(connectFuture);
179         sshAuthListener.operationComplete(getSuccessAuthFuture());
180         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
181
182         verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
183
184         verify(promise).setSuccess();
185         verifyNoMoreInteractions(promise);
186         verify(ctx).fireChannelActive();
187     }
188
189     @Test
190     public void testRead() throws Exception {
191         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
192
193         final IoInputStream asyncOut = getMockedIoInputStream();
194         final IoOutputStream asyncIn = getMockedIoOutputStream();
195         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
196         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
197         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
198
199         sshConnectListener.operationComplete(connectFuture);
200         sshAuthListener.operationComplete(getSuccessAuthFuture());
201         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
202
203         verify(ctx).fireChannelRead(any(ByteBuf.class));
204     }
205
206     @Test
207     public void testReadClosed() throws Exception {
208         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
209
210         final IoInputStream asyncOut = getMockedIoInputStream();
211         final IoReadFuture mockedReadFuture = asyncOut.read(null);
212
213         Futures.addCallback(stubAddListener(mockedReadFuture), new SuccessFutureListener<IoReadFuture>() {
214             @Override
215             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
216                 doReturn(new IllegalStateException()).when(mockedReadFuture).getException();
217                 doReturn(mockedReadFuture).when(mockedReadFuture).removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
218                 doReturn(true).when(asyncOut).isClosing();
219                 doReturn(true).when(asyncOut).isClosed();
220                 result.operationComplete(mockedReadFuture);
221             }
222         });
223
224         final IoOutputStream asyncIn = getMockedIoOutputStream();
225         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
226         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
227         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
228
229         sshConnectListener.operationComplete(connectFuture);
230         sshAuthListener.operationComplete(getSuccessAuthFuture());
231         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
232
233         verify(ctx).fireChannelInactive();
234     }
235
236     @Test
237     public void testReadFail() throws Exception {
238         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
239
240         final IoInputStream asyncOut = getMockedIoInputStream();
241         final IoReadFuture mockedReadFuture = asyncOut.read(null);
242
243         Futures.addCallback(stubAddListener(mockedReadFuture), new SuccessFutureListener<IoReadFuture>() {
244             @Override
245             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
246                 doReturn(new IllegalStateException()).when(mockedReadFuture).getException();
247                 doReturn(mockedReadFuture).when(mockedReadFuture).removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
248                 result.operationComplete(mockedReadFuture);
249             }
250         });
251
252         final IoOutputStream asyncIn = getMockedIoOutputStream();
253         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
254         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
255         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
256
257         sshConnectListener.operationComplete(connectFuture);
258         sshAuthListener.operationComplete(getSuccessAuthFuture());
259         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
260
261         verify(ctx).fireChannelInactive();
262     }
263
264     @Test
265     public void testWrite() throws Exception {
266         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
267
268         final IoInputStream asyncOut = getMockedIoInputStream();
269         final IoOutputStream asyncIn = getMockedIoOutputStream();
270         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
271         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
272         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
273
274         sshConnectListener.operationComplete(connectFuture);
275         sshAuthListener.operationComplete(getSuccessAuthFuture());
276         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
277
278         final ChannelPromise writePromise = getMockedPromise();
279         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), writePromise);
280
281         verify(writePromise).setSuccess();
282     }
283
284     @Test
285     public void testWriteClosed() throws Exception {
286         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
287
288         final IoInputStream asyncOut = getMockedIoInputStream();
289         final IoOutputStream asyncIn = getMockedIoOutputStream();
290
291         final IoWriteFuture ioWriteFuture = asyncIn.write(null);
292
293         Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
294             @Override
295             public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
296                 doReturn(false).when(ioWriteFuture).isWritten();
297                 doReturn(new IllegalStateException()).when(ioWriteFuture).getException();
298                 doReturn(true).when(asyncIn).isClosing();
299                 doReturn(true).when(asyncIn).isClosed();
300                 result.operationComplete(ioWriteFuture);
301             }
302         });
303
304         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
305         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
306         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
307
308         sshConnectListener.operationComplete(connectFuture);
309         sshAuthListener.operationComplete(getSuccessAuthFuture());
310         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
311
312         final ChannelPromise writePromise = getMockedPromise();
313         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), writePromise);
314
315         verify(writePromise).setFailure(any(Throwable.class));
316     }
317
318     @Test
319     public void testWritePendingOne() throws Exception {
320         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
321
322         final IoInputStream asyncOut = getMockedIoInputStream();
323         final IoOutputStream asyncIn = getMockedIoOutputStream();
324         final IoWriteFuture ioWriteFuture = asyncIn.write(null);
325
326         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
327         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
328         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
329
330         sshConnectListener.operationComplete(connectFuture);
331         sshAuthListener.operationComplete(getSuccessAuthFuture());
332         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
333
334         final ChannelPromise firstWritePromise = getMockedPromise();
335
336         // intercept listener for first write, so we can invoke successful write later thus simulate pending of the first write
337         final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture = stubAddListener(ioWriteFuture);
338         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
339         final SshFutureListener<IoWriteFuture> firstWriteListener = firstWriteListenerFuture.get();
340         // intercept second listener, this is the listener for pending write for the pending write to know when pending state ended
341         final ListenableFuture<SshFutureListener<IoWriteFuture>> pendingListener = stubAddListener(ioWriteFuture);
342
343         final ChannelPromise secondWritePromise = getMockedPromise();
344         // now make write throw pending exception
345         doThrow(org.apache.sshd.common.io.WritePendingException.class).when(asyncIn).write(any(Buffer.class));
346         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
347
348         doReturn(ioWriteFuture).when(asyncIn).write(any(Buffer.class));
349
350         verifyZeroInteractions(firstWritePromise, secondWritePromise);
351
352         // make first write stop pending
353         firstWriteListener.operationComplete(ioWriteFuture);
354
355         // notify listener for second write that pending has ended
356         pendingListener.get().operationComplete(ioWriteFuture);
357
358         // verify both write promises successful
359         verify(firstWritePromise).setSuccess();
360         verify(secondWritePromise).setSuccess();
361     }
362
363     @Ignore("Pending queue is not limited")
364     @Test
365     public void testWritePendingMax() throws Exception {
366         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
367
368         final IoInputStream asyncOut = getMockedIoInputStream();
369         final IoOutputStream asyncIn = getMockedIoOutputStream();
370         final IoWriteFuture ioWriteFuture = asyncIn.write(null);
371
372         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
373         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
374         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
375
376         sshConnectListener.operationComplete(connectFuture);
377         sshAuthListener.operationComplete(getSuccessAuthFuture());
378         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
379
380         final ChannelPromise firstWritePromise = getMockedPromise();
381
382         // intercept listener for first write, so we can invoke successful write later thus simulate pending of the first write
383         final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture = stubAddListener(ioWriteFuture);
384         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
385
386         final ChannelPromise secondWritePromise = getMockedPromise();
387         // now make write throw pending exception
388         doThrow(org.apache.sshd.common.io.WritePendingException.class).when(asyncIn).write(any(Buffer.class));
389         for (int i = 0; i < 1001; i++) {
390             asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
391         }
392
393         verify(secondWritePromise, times(1)).setFailure(any(Throwable.class));
394     }
395
396     @Test
397     public void testDisconnect() throws Exception {
398         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
399
400         final IoInputStream asyncOut = getMockedIoInputStream();
401         final IoOutputStream asyncIn = getMockedIoOutputStream();
402         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
403         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
404         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
405
406         sshConnectListener.operationComplete(connectFuture);
407         sshAuthListener.operationComplete(getSuccessAuthFuture());
408         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
409
410         final ChannelPromise disconnectPromise = getMockedPromise();
411         asyncSshHandler.disconnect(ctx, disconnectPromise);
412
413         verify(sshSession).close(anyBoolean());
414         verify(disconnectPromise).setSuccess();
415         verify(ctx).fireChannelInactive();
416     }
417
418     private OpenFuture getSuccessOpenFuture() {
419         final OpenFuture failedOpenFuture = mock(OpenFuture.class);
420         doReturn(true).when(failedOpenFuture).isOpened();
421         return failedOpenFuture;
422     }
423
424     private AuthFuture getSuccessAuthFuture() {
425         final AuthFuture authFuture = mock(AuthFuture.class);
426         doReturn(true).when(authFuture).isSuccess();
427         return authFuture;
428     }
429
430     private ConnectFuture getSuccessConnectFuture(final ClientSession sshSession) {
431         final ConnectFuture connectFuture = mock(ConnectFuture.class);
432         doReturn(true).when(connectFuture).isConnected();
433
434         doReturn(sshSession).when(connectFuture).getSession();
435         return connectFuture;
436     }
437
438     private ClientSession getMockedSshSession(final ChannelSubsystem subsystemChannel) throws IOException {
439         final ClientSession sshSession = mock(ClientSession.class);
440
441         doReturn("sshSession").when(sshSession).toString();
442         doReturn("serverVersion").when(sshSession).getServerVersion();
443         doReturn(false).when(sshSession).isClosed();
444         doReturn(false).when(sshSession).isClosing();
445         final CloseFuture closeFuture = mock(CloseFuture.class);
446         Futures.addCallback(stubAddListener(closeFuture), new SuccessFutureListener<CloseFuture>() {
447             @Override
448             public void onSuccess(final SshFutureListener<CloseFuture> result) {
449                 doReturn(true).when(closeFuture).isClosed();
450                 result.operationComplete(closeFuture);
451             }
452         });
453         doReturn(closeFuture).when(sshSession).close(false);
454
455         doReturn(subsystemChannel).when(sshSession).createSubsystemChannel(anyString());
456
457         return sshSession;
458     }
459
460     private ChannelSubsystem getMockedSubsystemChannel(final IoInputStream asyncOut, final IoOutputStream asyncIn) throws IOException {
461         final ChannelSubsystem subsystemChannel = mock(ChannelSubsystem.class);
462         doNothing().when(subsystemChannel).setStreaming(any(ClientChannel.Streaming.class));
463         final OpenFuture openFuture = mock(OpenFuture.class);
464
465         Futures.addCallback(stubAddListener(openFuture), new SuccessFutureListener<OpenFuture>() {
466             @Override
467             public void onSuccess(final SshFutureListener<OpenFuture> result) {
468                 sshChannelOpenListener = result;
469             }
470         });
471
472         doReturn(asyncOut).when(subsystemChannel).getAsyncOut();
473
474         doReturn(openFuture).when(subsystemChannel).open();
475         doReturn(asyncIn).when(subsystemChannel).getAsyncIn();
476         return subsystemChannel;
477     }
478
479     private IoOutputStream getMockedIoOutputStream() {
480         final IoOutputStream mock = mock(IoOutputStream.class);
481         final IoWriteFuture ioWriteFuture = mock(IoWriteFuture.class);
482         doReturn(ioWriteFuture).when(ioWriteFuture).addListener(Matchers.<SshFutureListener<IoWriteFuture>>any());
483         doReturn(true).when(ioWriteFuture).isWritten();
484
485         Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
486             @Override
487             public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
488                 result.operationComplete(ioWriteFuture);
489             }
490         });
491
492         doReturn(ioWriteFuture).when(mock).write(any(Buffer.class));
493         doReturn(false).when(mock).isClosed();
494         doReturn(false).when(mock).isClosing();
495         return mock;
496     }
497
498     private IoInputStream getMockedIoInputStream() {
499         final IoInputStream mock = mock(IoInputStream.class);
500         final IoReadFuture ioReadFuture = mock(IoReadFuture.class);
501         doReturn(null).when(ioReadFuture).getException();
502         doReturn(ioReadFuture).when(ioReadFuture).removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
503         doReturn(5).when(ioReadFuture).getRead();
504         doReturn(new Buffer(new byte[]{0, 1, 2, 3, 4})).when(ioReadFuture).getBuffer();
505         doReturn(ioReadFuture).when(ioReadFuture).addListener(Matchers.<SshFutureListener<IoReadFuture>>any());
506
507         // Always success for read
508         Futures.addCallback(stubAddListener(ioReadFuture), new SuccessFutureListener<IoReadFuture>() {
509             @Override
510             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
511                 result.operationComplete(ioReadFuture);
512             }
513         });
514
515         doReturn(ioReadFuture).when(mock).read(any(Buffer.class));
516         doReturn(false).when(mock).isClosed();
517         doReturn(false).when(mock).isClosing();
518         return mock;
519     }
520
521     @Test
522     public void testConnectFailOpenChannel() throws Exception {
523         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
524
525         final IoInputStream asyncOut = getMockedIoInputStream();
526         final IoOutputStream asyncIn = getMockedIoOutputStream();
527         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
528         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
529         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
530
531         sshConnectListener.operationComplete(connectFuture);
532
533         sshAuthListener.operationComplete(getSuccessAuthFuture());
534
535         verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
536
537         try {
538             sshChannelOpenListener.operationComplete(getFailedOpenFuture());
539             fail("Exception expected");
540         } catch (final Exception e) {
541             verify(promise).setFailure(any(Throwable.class));
542             verifyNoMoreInteractions(promise);
543             // TODO should ctx.channelInactive be called if we throw exception ?
544         }
545     }
546
547     @Test
548     public void testConnectFailAuth() throws Exception {
549         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
550
551         final ClientSession sshSession = mock(ClientSession.class);
552         doReturn(true).when(sshSession).isClosed();
553         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
554
555         sshConnectListener.operationComplete(connectFuture);
556
557         final AuthFuture authFuture = getFailedAuthFuture();
558
559         try {
560             sshAuthListener.operationComplete(authFuture);
561             fail("Exception expected");
562         } catch (final Exception e) {
563             verify(promise).setFailure(any(Throwable.class));
564             verifyNoMoreInteractions(promise);
565             // TODO should ctx.channelInactive be called ?
566         }
567     }
568
569     private AuthFuture getFailedAuthFuture() {
570         final AuthFuture authFuture = mock(AuthFuture.class);
571         doReturn(false).when(authFuture).isSuccess();
572         doReturn(new IllegalStateException()).when(authFuture).getException();
573         return authFuture;
574     }
575
576     private OpenFuture getFailedOpenFuture() {
577         final OpenFuture authFuture = mock(OpenFuture.class);
578         doReturn(false).when(authFuture).isOpened();
579         doReturn(new IllegalStateException()).when(authFuture).getException();
580         return authFuture;
581     }
582
583     @Test
584     public void testConnectFail() throws Exception {
585         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
586
587         final ConnectFuture connectFuture = getFailedConnectFuture();
588         try {
589             sshConnectListener.operationComplete(connectFuture);
590             fail("Exception expected");
591         } catch (final Exception e) {
592             verify(promise).setFailure(any(Throwable.class));
593             verifyNoMoreInteractions(promise);
594             // TODO should ctx.channelInactive be called ?
595         }
596     }
597
598     private ConnectFuture getFailedConnectFuture() {
599         final ConnectFuture connectFuture = mock(ConnectFuture.class);
600         doReturn(false).when(connectFuture).isConnected();
601         doReturn(new IllegalStateException()).when(connectFuture).getException();
602         return connectFuture;
603     }
604
605     private ChannelPromise getMockedPromise() {
606         final ChannelPromise promise = mock(ChannelPromise.class);
607         doReturn(promise).when(promise).setSuccess();
608         doReturn(promise).when(promise).setFailure(any(Throwable.class));
609         return promise;
610     }
611
612     private static abstract class SuccessFutureListener<T extends SshFuture<T>> implements FutureCallback<SshFutureListener<T>> {
613
614         @Override
615         public abstract void onSuccess(final SshFutureListener<T> result);
616
617         @Override
618         public void onFailure(final Throwable t) {
619             throw new RuntimeException(t);
620         }
621     }
622 }