Merge "BUG 1839 - HTTP delete of non existing data"
[controller.git] / opendaylight / netconf / netconf-netty-util / src / test / java / org / opendaylight / controller / netconf / nettyutil / handler / ssh / client / AsyncSshHandlerTest.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
10
11 import static org.junit.Assert.fail;
12 import static org.mockito.Matchers.any;
13 import static org.mockito.Matchers.anyBoolean;
14 import static org.mockito.Matchers.anyObject;
15 import static org.mockito.Matchers.anyString;
16 import static org.mockito.Mockito.doAnswer;
17 import static org.mockito.Mockito.doNothing;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.doThrow;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.times;
22 import static org.mockito.Mockito.verify;
23 import static org.mockito.Mockito.verifyNoMoreInteractions;
24 import static org.mockito.Mockito.verifyZeroInteractions;
25
26 import java.io.IOException;
27 import java.net.SocketAddress;
28
29 import org.apache.sshd.ClientChannel;
30 import org.apache.sshd.ClientSession;
31 import org.apache.sshd.SshClient;
32 import org.apache.sshd.client.channel.ChannelSubsystem;
33 import org.apache.sshd.client.future.AuthFuture;
34 import org.apache.sshd.client.future.ConnectFuture;
35 import org.apache.sshd.client.future.OpenFuture;
36 import org.apache.sshd.common.future.CloseFuture;
37 import org.apache.sshd.common.future.SshFuture;
38 import org.apache.sshd.common.future.SshFutureListener;
39 import org.apache.sshd.common.io.IoInputStream;
40 import org.apache.sshd.common.io.IoOutputStream;
41 import org.apache.sshd.common.io.IoReadFuture;
42 import org.apache.sshd.common.io.IoWriteFuture;
43 import org.apache.sshd.common.util.Buffer;
44 import org.junit.After;
45 import org.junit.Before;
46 import org.junit.Ignore;
47 import org.junit.Test;
48 import org.mockito.Matchers;
49 import org.mockito.Mock;
50 import org.mockito.MockitoAnnotations;
51 import org.mockito.invocation.InvocationOnMock;
52 import org.mockito.stubbing.Answer;
53 import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
54
55 import com.google.common.util.concurrent.FutureCallback;
56 import com.google.common.util.concurrent.Futures;
57 import com.google.common.util.concurrent.ListenableFuture;
58 import com.google.common.util.concurrent.SettableFuture;
59
60 import io.netty.buffer.ByteBuf;
61 import io.netty.buffer.Unpooled;
62 import io.netty.channel.Channel;
63 import io.netty.channel.ChannelHandlerContext;
64 import io.netty.channel.ChannelPromise;
65
66 public class AsyncSshHandlerTest {
67
68     @Mock
69     private SshClient sshClient;
70     @Mock
71     private AuthenticationHandler authHandler;
72     @Mock
73     private ChannelHandlerContext ctx;
74     @Mock
75     private Channel channel;
76     @Mock
77     private SocketAddress remoteAddress;
78     @Mock
79     private SocketAddress localAddress;
80
81     private AsyncSshHandler asyncSshHandler;
82
83     private SshFutureListener<ConnectFuture> sshConnectListener;
84     private SshFutureListener<AuthFuture> sshAuthListener;
85     private SshFutureListener<OpenFuture> sshChannelOpenListener;
86
87     private ChannelPromise promise;
88
89     @Before
90     public void setUp() throws Exception {
91         MockitoAnnotations.initMocks(this);
92         stubAuth();
93         stubSshClient();
94         stubChannel();
95         stubCtx();
96         stubRemoteAddress();
97
98         promise = getMockedPromise();
99
100         asyncSshHandler = new AsyncSshHandler(authHandler, sshClient);
101     }
102
103     @After
104     public void tearDown() throws Exception {
105         sshConnectListener = null;
106         sshAuthListener = null;
107         sshChannelOpenListener = null;
108         promise = null;
109         asyncSshHandler.close(ctx, getMockedPromise());
110     }
111
112     private void stubAuth() throws IOException {
113         doReturn("usr").when(authHandler).getUsername();
114
115         final AuthFuture authFuture = mock(AuthFuture.class);
116         Futures.addCallback(stubAddListener(authFuture), new SuccessFutureListener<AuthFuture>() {
117             @Override
118             public void onSuccess(final SshFutureListener<AuthFuture> result) {
119                 sshAuthListener = result;
120             }
121         });
122         doReturn(authFuture).when(authHandler).authenticate(any(ClientSession.class));
123     }
124
125     @SuppressWarnings("unchecked")
126     private <T extends SshFuture<T>> ListenableFuture<SshFutureListener<T>> stubAddListener(final T future) {
127         final SettableFuture<SshFutureListener<T>> listenerSettableFuture = SettableFuture.create();
128
129         doAnswer(new Answer() {
130             @Override
131             public Object answer(final InvocationOnMock invocation) throws Throwable {
132                 listenerSettableFuture.set((SshFutureListener<T>) invocation.getArguments()[0]);
133                 return null;
134             }
135         }).when(future).addListener(any(SshFutureListener.class));
136
137         return listenerSettableFuture;
138     }
139
140     private void stubRemoteAddress() {
141         doReturn("remote").when(remoteAddress).toString();
142     }
143
144     private void stubCtx() {
145         doReturn(channel).when(ctx).channel();
146         doReturn(ctx).when(ctx).fireChannelActive();
147         doReturn(ctx).when(ctx).fireChannelInactive();
148         doReturn(ctx).when(ctx).fireChannelRead(anyObject());
149         doReturn(getMockedPromise()).when(ctx).newPromise();
150     }
151
152     private void stubChannel() {
153         doReturn("channel").when(channel).toString();
154     }
155
156     private void stubSshClient() {
157         doNothing().when(sshClient).start();
158         final ConnectFuture connectFuture = mock(ConnectFuture.class);
159         Futures.addCallback(stubAddListener(connectFuture), new SuccessFutureListener<ConnectFuture>() {
160             @Override
161             public void onSuccess(final SshFutureListener<ConnectFuture> result) {
162                 sshConnectListener = result;
163             }
164         });
165         doReturn(connectFuture).when(sshClient).connect("usr", remoteAddress);
166     }
167
168     @Test
169     public void testConnectSuccess() throws Exception {
170         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
171
172         final IoInputStream asyncOut = getMockedIoInputStream();
173         final IoOutputStream asyncIn = getMockedIoOutputStream();
174         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
175         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
176         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
177
178         sshConnectListener.operationComplete(connectFuture);
179         sshAuthListener.operationComplete(getSuccessAuthFuture());
180         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
181
182         verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
183
184         verify(promise).setSuccess();
185         verifyNoMoreInteractions(promise);
186         verify(ctx).fireChannelActive();
187     }
188
189     @Test
190     public void testRead() throws Exception {
191         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
192
193         final IoInputStream asyncOut = getMockedIoInputStream();
194         final IoOutputStream asyncIn = getMockedIoOutputStream();
195         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
196         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
197         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
198
199         sshConnectListener.operationComplete(connectFuture);
200         sshAuthListener.operationComplete(getSuccessAuthFuture());
201         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
202
203         verify(ctx).fireChannelRead(any(ByteBuf.class));
204     }
205
206     @Test
207     public void testReadClosed() throws Exception {
208         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
209
210         final IoInputStream asyncOut = getMockedIoInputStream();
211         final IoReadFuture mockedReadFuture = asyncOut.read(null);
212
213         Futures.addCallback(stubAddListener(mockedReadFuture), new SuccessFutureListener<IoReadFuture>() {
214             @Override
215             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
216                 doReturn(new IllegalStateException()).when(mockedReadFuture).getException();
217                 doReturn(mockedReadFuture).when(mockedReadFuture).removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
218                 doReturn(true).when(asyncOut).isClosing();
219                 doReturn(true).when(asyncOut).isClosed();
220                 result.operationComplete(mockedReadFuture);
221             }
222         });
223
224         final IoOutputStream asyncIn = getMockedIoOutputStream();
225         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
226         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
227         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
228
229         sshConnectListener.operationComplete(connectFuture);
230         sshAuthListener.operationComplete(getSuccessAuthFuture());
231         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
232
233         verify(ctx).fireChannelInactive();
234     }
235
236     @Test
237     public void testReadFail() throws Exception {
238         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
239
240         final IoInputStream asyncOut = getMockedIoInputStream();
241         final IoReadFuture mockedReadFuture = asyncOut.read(null);
242
243         Futures.addCallback(stubAddListener(mockedReadFuture), new SuccessFutureListener<IoReadFuture>() {
244             @Override
245             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
246                 doReturn(new IllegalStateException()).when(mockedReadFuture).getException();
247                 doReturn(mockedReadFuture).when(mockedReadFuture).removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
248                 result.operationComplete(mockedReadFuture);
249             }
250         });
251
252         final IoOutputStream asyncIn = getMockedIoOutputStream();
253         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
254         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
255         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
256
257         sshConnectListener.operationComplete(connectFuture);
258         sshAuthListener.operationComplete(getSuccessAuthFuture());
259         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
260
261         verify(ctx).fireChannelInactive();
262     }
263
264     @Test
265     public void testWrite() throws Exception {
266         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
267
268         final IoInputStream asyncOut = getMockedIoInputStream();
269         final IoOutputStream asyncIn = getMockedIoOutputStream();
270         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
271         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
272         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
273
274         sshConnectListener.operationComplete(connectFuture);
275         sshAuthListener.operationComplete(getSuccessAuthFuture());
276         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
277
278         final ChannelPromise writePromise = getMockedPromise();
279         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), writePromise);
280
281         verify(writePromise).setSuccess();
282     }
283
284     @Test
285     public void testWriteClosed() throws Exception {
286         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
287
288         final IoInputStream asyncOut = getMockedIoInputStream();
289         final IoOutputStream asyncIn = getMockedIoOutputStream();
290
291         final IoWriteFuture ioWriteFuture = asyncIn.write(null);
292
293         Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
294             @Override
295             public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
296                 doReturn(false).when(ioWriteFuture).isWritten();
297                 doReturn(new IllegalStateException()).when(ioWriteFuture).getException();
298                 doReturn(true).when(asyncIn).isClosing();
299                 doReturn(true).when(asyncIn).isClosed();
300                 result.operationComplete(ioWriteFuture);
301             }
302         });
303
304         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
305         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
306         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
307
308         sshConnectListener.operationComplete(connectFuture);
309         sshAuthListener.operationComplete(getSuccessAuthFuture());
310         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
311
312         final ChannelPromise writePromise = getMockedPromise();
313         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), writePromise);
314
315         verify(writePromise).setFailure(any(Throwable.class));
316     }
317
318     @Test
319     public void testWritePendingOne() throws Exception {
320         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
321
322         final IoInputStream asyncOut = getMockedIoInputStream();
323         final IoOutputStream asyncIn = getMockedIoOutputStream();
324         final IoWriteFuture ioWriteFuture = asyncIn.write(null);
325
326         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
327         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
328         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
329
330         sshConnectListener.operationComplete(connectFuture);
331         sshAuthListener.operationComplete(getSuccessAuthFuture());
332         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
333
334         final ChannelPromise firstWritePromise = getMockedPromise();
335
336         // intercept listener for first write, so we can invoke successful write later thus simulate pending of the first write
337         final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture = stubAddListener(ioWriteFuture);
338         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
339         final SshFutureListener<IoWriteFuture> firstWriteListener = firstWriteListenerFuture.get();
340         // intercept second listener, this is the listener for pending write for the pending write to know when pending state ended
341         final ListenableFuture<SshFutureListener<IoWriteFuture>> pendingListener = stubAddListener(ioWriteFuture);
342
343         final ChannelPromise secondWritePromise = getMockedPromise();
344         // now make write throw pending exception
345         doThrow(org.apache.sshd.common.io.WritePendingException.class).when(asyncIn).write(any(Buffer.class));
346         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
347
348         doReturn(ioWriteFuture).when(asyncIn).write(any(Buffer.class));
349
350         verifyZeroInteractions(firstWritePromise, secondWritePromise);
351
352         // make first write stop pending
353         firstWriteListener.operationComplete(ioWriteFuture);
354
355         // notify listener for second write that pending has ended
356         pendingListener.get().operationComplete(ioWriteFuture);
357
358         // verify both write promises successful
359         verify(firstWritePromise).setSuccess();
360         verify(secondWritePromise).setSuccess();
361     }
362
363     @Ignore("Pending queue is not limited")
364     @Test
365     public void testWritePendingMax() throws Exception {
366         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
367
368         final IoInputStream asyncOut = getMockedIoInputStream();
369         final IoOutputStream asyncIn = getMockedIoOutputStream();
370         final IoWriteFuture ioWriteFuture = asyncIn.write(null);
371
372         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
373         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
374         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
375
376         sshConnectListener.operationComplete(connectFuture);
377         sshAuthListener.operationComplete(getSuccessAuthFuture());
378         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
379
380         final ChannelPromise firstWritePromise = getMockedPromise();
381
382         // intercept listener for first write, so we can invoke successful write later thus simulate pending of the first write
383         final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture = stubAddListener(ioWriteFuture);
384         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
385
386         final ChannelPromise secondWritePromise = getMockedPromise();
387         // now make write throw pending exception
388         doThrow(org.apache.sshd.common.io.WritePendingException.class).when(asyncIn).write(any(Buffer.class));
389         for (int i = 0; i < 1001; i++) {
390             asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
391         }
392
393         verify(secondWritePromise, times(1)).setFailure(any(Throwable.class));
394     }
395
396     @Test
397     public void testDisconnect() throws Exception {
398         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
399
400         final IoInputStream asyncOut = getMockedIoInputStream();
401         final IoOutputStream asyncIn = getMockedIoOutputStream();
402         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
403         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
404         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
405
406         sshConnectListener.operationComplete(connectFuture);
407         sshAuthListener.operationComplete(getSuccessAuthFuture());
408         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
409
410         final ChannelPromise disconnectPromise = getMockedPromise();
411         asyncSshHandler.disconnect(ctx, disconnectPromise);
412
413         verify(sshSession).close(anyBoolean());
414         verify(disconnectPromise).setSuccess();
415         verify(ctx).fireChannelInactive();
416     }
417
418     private OpenFuture getSuccessOpenFuture() {
419         final OpenFuture failedOpenFuture = mock(OpenFuture.class);
420         doReturn(true).when(failedOpenFuture).isOpened();
421         return failedOpenFuture;
422     }
423
424     private AuthFuture getSuccessAuthFuture() {
425         final AuthFuture authFuture = mock(AuthFuture.class);
426         doReturn(true).when(authFuture).isSuccess();
427         return authFuture;
428     }
429
430     private ConnectFuture getSuccessConnectFuture(final ClientSession sshSession) {
431         final ConnectFuture connectFuture = mock(ConnectFuture.class);
432         doReturn(true).when(connectFuture).isConnected();
433
434         doReturn(sshSession).when(connectFuture).getSession();
435         return connectFuture;
436     }
437
438     private ClientSession getMockedSshSession(final ChannelSubsystem subsystemChannel) throws IOException {
439         final ClientSession sshSession = mock(ClientSession.class);
440
441         doReturn("sshSession").when(sshSession).toString();
442         doReturn("serverVersion").when(sshSession).getServerVersion();
443         doReturn(false).when(sshSession).isClosed();
444         doReturn(false).when(sshSession).isClosing();
445         final CloseFuture closeFuture = mock(CloseFuture.class);
446         Futures.addCallback(stubAddListener(closeFuture), new SuccessFutureListener<CloseFuture>() {
447             @Override
448             public void onSuccess(final SshFutureListener<CloseFuture> result) {
449                 doReturn(true).when(closeFuture).isClosed();
450                 result.operationComplete(closeFuture);
451             }
452         });
453         doReturn(closeFuture).when(sshSession).close(false);
454
455         doReturn(subsystemChannel).when(sshSession).createSubsystemChannel(anyString());
456
457         return sshSession;
458     }
459
460     private ChannelSubsystem getMockedSubsystemChannel(final IoInputStream asyncOut, final IoOutputStream asyncIn) throws IOException {
461         final ChannelSubsystem subsystemChannel = mock(ChannelSubsystem.class);
462         doReturn("subsystemChannel").when(subsystemChannel).toString();
463
464         doNothing().when(subsystemChannel).setStreaming(any(ClientChannel.Streaming.class));
465         final OpenFuture openFuture = mock(OpenFuture.class);
466
467         Futures.addCallback(stubAddListener(openFuture), new SuccessFutureListener<OpenFuture>() {
468             @Override
469             public void onSuccess(final SshFutureListener<OpenFuture> result) {
470                 sshChannelOpenListener = result;
471             }
472         });
473
474         doReturn(asyncOut).when(subsystemChannel).getAsyncOut();
475
476         doReturn(openFuture).when(subsystemChannel).open();
477         doReturn(asyncIn).when(subsystemChannel).getAsyncIn();
478         return subsystemChannel;
479     }
480
481     private IoOutputStream getMockedIoOutputStream() {
482         final IoOutputStream mock = mock(IoOutputStream.class);
483         final IoWriteFuture ioWriteFuture = mock(IoWriteFuture.class);
484         doReturn(ioWriteFuture).when(ioWriteFuture).addListener(Matchers.<SshFutureListener<IoWriteFuture>>any());
485         doReturn(true).when(ioWriteFuture).isWritten();
486
487         Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
488             @Override
489             public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
490                 result.operationComplete(ioWriteFuture);
491             }
492         });
493
494         doReturn(ioWriteFuture).when(mock).write(any(Buffer.class));
495         doReturn(false).when(mock).isClosed();
496         doReturn(false).when(mock).isClosing();
497         return mock;
498     }
499
500     private IoInputStream getMockedIoInputStream() {
501         final IoInputStream mock = mock(IoInputStream.class);
502         final IoReadFuture ioReadFuture = mock(IoReadFuture.class);
503         doReturn(null).when(ioReadFuture).getException();
504         doReturn(ioReadFuture).when(ioReadFuture).removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
505         doReturn(5).when(ioReadFuture).getRead();
506         doReturn(new Buffer(new byte[]{0, 1, 2, 3, 4})).when(ioReadFuture).getBuffer();
507         doReturn(ioReadFuture).when(ioReadFuture).addListener(Matchers.<SshFutureListener<IoReadFuture>>any());
508
509         // Always success for read
510         Futures.addCallback(stubAddListener(ioReadFuture), new SuccessFutureListener<IoReadFuture>() {
511             @Override
512             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
513                 result.operationComplete(ioReadFuture);
514             }
515         });
516
517         doReturn(ioReadFuture).when(mock).read(any(Buffer.class));
518         doReturn(false).when(mock).isClosed();
519         doReturn(false).when(mock).isClosing();
520         return mock;
521     }
522
523     @Test
524     public void testConnectFailOpenChannel() throws Exception {
525         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
526
527         final IoInputStream asyncOut = getMockedIoInputStream();
528         final IoOutputStream asyncIn = getMockedIoOutputStream();
529         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
530         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
531         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
532
533         sshConnectListener.operationComplete(connectFuture);
534
535         sshAuthListener.operationComplete(getSuccessAuthFuture());
536
537         verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
538
539         try {
540             sshChannelOpenListener.operationComplete(getFailedOpenFuture());
541             fail("Exception expected");
542         } catch (final Exception e) {
543             verify(promise).setFailure(any(Throwable.class));
544             verifyNoMoreInteractions(promise);
545             // TODO should ctx.channelInactive be called if we throw exception ?
546         }
547     }
548
549     @Test
550     public void testConnectFailAuth() throws Exception {
551         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
552
553         final ClientSession sshSession = mock(ClientSession.class);
554         doReturn(true).when(sshSession).isClosed();
555         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
556
557         sshConnectListener.operationComplete(connectFuture);
558
559         final AuthFuture authFuture = getFailedAuthFuture();
560
561         try {
562             sshAuthListener.operationComplete(authFuture);
563             fail("Exception expected");
564         } catch (final Exception e) {
565             verify(promise).setFailure(any(Throwable.class));
566             verifyNoMoreInteractions(promise);
567             // TODO should ctx.channelInactive be called ?
568         }
569     }
570
571     private AuthFuture getFailedAuthFuture() {
572         final AuthFuture authFuture = mock(AuthFuture.class);
573         doReturn(false).when(authFuture).isSuccess();
574         doReturn(new IllegalStateException()).when(authFuture).getException();
575         return authFuture;
576     }
577
578     private OpenFuture getFailedOpenFuture() {
579         final OpenFuture authFuture = mock(OpenFuture.class);
580         doReturn(false).when(authFuture).isOpened();
581         doReturn(new IllegalStateException()).when(authFuture).getException();
582         return authFuture;
583     }
584
585     @Test
586     public void testConnectFail() throws Exception {
587         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
588
589         final ConnectFuture connectFuture = getFailedConnectFuture();
590         try {
591             sshConnectListener.operationComplete(connectFuture);
592             fail("Exception expected");
593         } catch (final Exception e) {
594             verify(promise).setFailure(any(Throwable.class));
595             verifyNoMoreInteractions(promise);
596             // TODO should ctx.channelInactive be called ?
597         }
598     }
599
600     private ConnectFuture getFailedConnectFuture() {
601         final ConnectFuture connectFuture = mock(ConnectFuture.class);
602         doReturn(false).when(connectFuture).isConnected();
603         doReturn(new IllegalStateException()).when(connectFuture).getException();
604         return connectFuture;
605     }
606
607     private ChannelPromise getMockedPromise() {
608         final ChannelPromise promise = mock(ChannelPromise.class);
609         doReturn(promise).when(promise).setSuccess();
610         doReturn(promise).when(promise).setFailure(any(Throwable.class));
611         return promise;
612     }
613
614     private static abstract class SuccessFutureListener<T extends SshFuture<T>> implements FutureCallback<SshFutureListener<T>> {
615
616         @Override
617         public abstract void onSuccess(final SshFutureListener<T> result);
618
619         @Override
620         public void onFailure(final Throwable t) {
621             throw new RuntimeException(t);
622         }
623     }
624 }