b4c9e1e95083dd94b21d57318ef989dca355bdd2
[controller.git] / opendaylight / netconf / netconf-netty-util / src / test / java / org / opendaylight / controller / netconf / nettyutil / handler / ssh / client / AsyncSshHandlerTest.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
10
11 import static org.junit.Assert.fail;
12 import static org.mockito.Matchers.any;
13 import static org.mockito.Matchers.anyBoolean;
14 import static org.mockito.Matchers.anyObject;
15 import static org.mockito.Matchers.anyString;
16 import static org.mockito.Mockito.doAnswer;
17 import static org.mockito.Mockito.doNothing;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.doThrow;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.times;
22 import static org.mockito.Mockito.verify;
23 import static org.mockito.Mockito.verifyNoMoreInteractions;
24 import static org.mockito.Mockito.verifyZeroInteractions;
25 import com.google.common.util.concurrent.FutureCallback;
26 import com.google.common.util.concurrent.Futures;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.SettableFuture;
29 import io.netty.buffer.ByteBuf;
30 import io.netty.buffer.Unpooled;
31 import io.netty.channel.Channel;
32 import io.netty.channel.ChannelHandlerContext;
33 import io.netty.channel.ChannelPromise;
34 import java.io.IOException;
35 import java.net.SocketAddress;
36 import org.apache.sshd.ClientChannel;
37 import org.apache.sshd.ClientSession;
38 import org.apache.sshd.SshClient;
39 import org.apache.sshd.client.channel.ChannelSubsystem;
40 import org.apache.sshd.client.future.AuthFuture;
41 import org.apache.sshd.client.future.ConnectFuture;
42 import org.apache.sshd.client.future.OpenFuture;
43 import org.apache.sshd.common.future.CloseFuture;
44 import org.apache.sshd.common.future.SshFuture;
45 import org.apache.sshd.common.future.SshFutureListener;
46 import org.apache.sshd.common.io.IoInputStream;
47 import org.apache.sshd.common.io.IoOutputStream;
48 import org.apache.sshd.common.io.IoReadFuture;
49 import org.apache.sshd.common.io.IoWriteFuture;
50 import org.apache.sshd.common.util.Buffer;
51 import org.junit.After;
52 import org.junit.Before;
53 import org.junit.Ignore;
54 import org.junit.Test;
55 import org.mockito.Matchers;
56 import org.mockito.Mock;
57 import org.mockito.MockitoAnnotations;
58 import org.mockito.invocation.InvocationOnMock;
59 import org.mockito.stubbing.Answer;
60 import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
61
62 public class AsyncSshHandlerTest {
63
64     @Mock
65     private SshClient sshClient;
66     @Mock
67     private AuthenticationHandler authHandler;
68     @Mock
69     private ChannelHandlerContext ctx;
70     @Mock
71     private Channel channel;
72     @Mock
73     private SocketAddress remoteAddress;
74     @Mock
75     private SocketAddress localAddress;
76
77     private AsyncSshHandler asyncSshHandler;
78
79     private SshFutureListener<ConnectFuture> sshConnectListener;
80     private SshFutureListener<AuthFuture> sshAuthListener;
81     private SshFutureListener<OpenFuture> sshChannelOpenListener;
82
83     private ChannelPromise promise;
84
85     @Before
86     public void setUp() throws Exception {
87         MockitoAnnotations.initMocks(this);
88         stubAuth();
89         stubSshClient();
90         stubChannel();
91         stubCtx();
92         stubRemoteAddress();
93
94         promise = getMockedPromise();
95
96         asyncSshHandler = new AsyncSshHandler(authHandler, sshClient);
97     }
98
99     @After
100     public void tearDown() throws Exception {
101         sshConnectListener = null;
102         sshAuthListener = null;
103         sshChannelOpenListener = null;
104         promise = null;
105         asyncSshHandler.close(ctx, getMockedPromise());
106     }
107
108     private void stubAuth() throws IOException {
109         doReturn("usr").when(authHandler).getUsername();
110
111         final AuthFuture authFuture = mock(AuthFuture.class);
112         Futures.addCallback(stubAddListener(authFuture), new SuccessFutureListener<AuthFuture>() {
113             @Override
114             public void onSuccess(final SshFutureListener<AuthFuture> result) {
115                 sshAuthListener = result;
116             }
117         });
118         doReturn(authFuture).when(authHandler).authenticate(any(ClientSession.class));
119     }
120
121     @SuppressWarnings("unchecked")
122     private <T extends SshFuture<T>> ListenableFuture<SshFutureListener<T>> stubAddListener(final T future) {
123         final SettableFuture<SshFutureListener<T>> listenerSettableFuture = SettableFuture.create();
124
125         doAnswer(new Answer<Object>() {
126             @Override
127             public Object answer(final InvocationOnMock invocation) throws Throwable {
128                 listenerSettableFuture.set((SshFutureListener<T>) invocation.getArguments()[0]);
129                 return null;
130             }
131         }).when(future).addListener(any(SshFutureListener.class));
132
133         return listenerSettableFuture;
134     }
135
136     private void stubRemoteAddress() {
137         doReturn("remote").when(remoteAddress).toString();
138     }
139
140     private void stubCtx() {
141         doReturn(channel).when(ctx).channel();
142         doReturn(ctx).when(ctx).fireChannelActive();
143         doReturn(ctx).when(ctx).fireChannelInactive();
144         doReturn(ctx).when(ctx).fireChannelRead(anyObject());
145         doReturn(getMockedPromise()).when(ctx).newPromise();
146     }
147
148     private void stubChannel() {
149         doReturn("channel").when(channel).toString();
150     }
151
152     private void stubSshClient() {
153         doNothing().when(sshClient).start();
154         final ConnectFuture connectFuture = mock(ConnectFuture.class);
155         Futures.addCallback(stubAddListener(connectFuture), new SuccessFutureListener<ConnectFuture>() {
156             @Override
157             public void onSuccess(final SshFutureListener<ConnectFuture> result) {
158                 sshConnectListener = result;
159             }
160         });
161         doReturn(connectFuture).when(sshClient).connect("usr", remoteAddress);
162     }
163
164     @Test
165     public void testConnectSuccess() throws Exception {
166         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
167
168         final IoInputStream asyncOut = getMockedIoInputStream();
169         final IoOutputStream asyncIn = getMockedIoOutputStream();
170         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
171         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
172         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
173
174         sshConnectListener.operationComplete(connectFuture);
175         sshAuthListener.operationComplete(getSuccessAuthFuture());
176         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
177
178         verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
179
180         verify(promise).setSuccess();
181         verifyNoMoreInteractions(promise);
182         verify(ctx).fireChannelActive();
183     }
184
185     @Test
186     public void testRead() throws Exception {
187         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
188
189         final IoInputStream asyncOut = getMockedIoInputStream();
190         final IoOutputStream asyncIn = getMockedIoOutputStream();
191         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
192         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
193         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
194
195         sshConnectListener.operationComplete(connectFuture);
196         sshAuthListener.operationComplete(getSuccessAuthFuture());
197         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
198
199         verify(ctx).fireChannelRead(any(ByteBuf.class));
200     }
201
202     @Test
203     public void testReadClosed() throws Exception {
204         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
205
206         final IoInputStream asyncOut = getMockedIoInputStream();
207         final IoReadFuture mockedReadFuture = asyncOut.read(null);
208
209         Futures.addCallback(stubAddListener(mockedReadFuture), new SuccessFutureListener<IoReadFuture>() {
210             @Override
211             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
212                 doReturn(new IllegalStateException()).when(mockedReadFuture).getException();
213                 doReturn(mockedReadFuture).when(mockedReadFuture).removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
214                 doReturn(true).when(asyncOut).isClosing();
215                 doReturn(true).when(asyncOut).isClosed();
216                 result.operationComplete(mockedReadFuture);
217             }
218         });
219
220         final IoOutputStream asyncIn = getMockedIoOutputStream();
221         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
222         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
223         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
224
225         sshConnectListener.operationComplete(connectFuture);
226         sshAuthListener.operationComplete(getSuccessAuthFuture());
227         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
228
229         verify(ctx).fireChannelInactive();
230     }
231
232     @Test
233     public void testReadFail() throws Exception {
234         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
235
236         final IoInputStream asyncOut = getMockedIoInputStream();
237         final IoReadFuture mockedReadFuture = asyncOut.read(null);
238
239         Futures.addCallback(stubAddListener(mockedReadFuture), new SuccessFutureListener<IoReadFuture>() {
240             @Override
241             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
242                 doReturn(new IllegalStateException()).when(mockedReadFuture).getException();
243                 doReturn(mockedReadFuture).when(mockedReadFuture).removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
244                 result.operationComplete(mockedReadFuture);
245             }
246         });
247
248         final IoOutputStream asyncIn = getMockedIoOutputStream();
249         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
250         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
251         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
252
253         sshConnectListener.operationComplete(connectFuture);
254         sshAuthListener.operationComplete(getSuccessAuthFuture());
255         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
256
257         verify(ctx).fireChannelInactive();
258     }
259
260     @Test
261     public void testWrite() throws Exception {
262         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
263
264         final IoInputStream asyncOut = getMockedIoInputStream();
265         final IoOutputStream asyncIn = getMockedIoOutputStream();
266         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
267         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
268         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
269
270         sshConnectListener.operationComplete(connectFuture);
271         sshAuthListener.operationComplete(getSuccessAuthFuture());
272         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
273
274         final ChannelPromise writePromise = getMockedPromise();
275         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), writePromise);
276
277         verify(writePromise).setSuccess();
278     }
279
280     @Test
281     public void testWriteClosed() throws Exception {
282         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
283
284         final IoInputStream asyncOut = getMockedIoInputStream();
285         final IoOutputStream asyncIn = getMockedIoOutputStream();
286
287         final IoWriteFuture ioWriteFuture = asyncIn.write(null);
288
289         Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
290             @Override
291             public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
292                 doReturn(false).when(ioWriteFuture).isWritten();
293                 doReturn(new IllegalStateException()).when(ioWriteFuture).getException();
294                 doReturn(true).when(asyncIn).isClosing();
295                 doReturn(true).when(asyncIn).isClosed();
296                 result.operationComplete(ioWriteFuture);
297             }
298         });
299
300         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
301         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
302         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
303
304         sshConnectListener.operationComplete(connectFuture);
305         sshAuthListener.operationComplete(getSuccessAuthFuture());
306         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
307
308         final ChannelPromise writePromise = getMockedPromise();
309         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), writePromise);
310
311         verify(writePromise).setFailure(any(Throwable.class));
312     }
313
314     @Test
315     public void testWritePendingOne() throws Exception {
316         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
317
318         final IoInputStream asyncOut = getMockedIoInputStream();
319         final IoOutputStream asyncIn = getMockedIoOutputStream();
320         final IoWriteFuture ioWriteFuture = asyncIn.write(null);
321
322         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
323         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
324         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
325
326         sshConnectListener.operationComplete(connectFuture);
327         sshAuthListener.operationComplete(getSuccessAuthFuture());
328         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
329
330         final ChannelPromise firstWritePromise = getMockedPromise();
331
332         // intercept listener for first write, so we can invoke successful write later thus simulate pending of the first write
333         final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture = stubAddListener(ioWriteFuture);
334         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
335         final SshFutureListener<IoWriteFuture> firstWriteListener = firstWriteListenerFuture.get();
336         // intercept second listener, this is the listener for pending write for the pending write to know when pending state ended
337         final ListenableFuture<SshFutureListener<IoWriteFuture>> pendingListener = stubAddListener(ioWriteFuture);
338
339         final ChannelPromise secondWritePromise = getMockedPromise();
340         // now make write throw pending exception
341         doThrow(org.apache.sshd.common.io.WritePendingException.class).when(asyncIn).write(any(Buffer.class));
342         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
343
344         doReturn(ioWriteFuture).when(asyncIn).write(any(Buffer.class));
345
346         verifyZeroInteractions(firstWritePromise, secondWritePromise);
347
348         // make first write stop pending
349         firstWriteListener.operationComplete(ioWriteFuture);
350
351         // notify listener for second write that pending has ended
352         pendingListener.get().operationComplete(ioWriteFuture);
353
354         // verify both write promises successful
355         verify(firstWritePromise).setSuccess();
356         verify(secondWritePromise).setSuccess();
357     }
358
359     @Ignore("Pending queue is not limited")
360     @Test
361     public void testWritePendingMax() throws Exception {
362         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
363
364         final IoInputStream asyncOut = getMockedIoInputStream();
365         final IoOutputStream asyncIn = getMockedIoOutputStream();
366         final IoWriteFuture ioWriteFuture = asyncIn.write(null);
367
368         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
369         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
370         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
371
372         sshConnectListener.operationComplete(connectFuture);
373         sshAuthListener.operationComplete(getSuccessAuthFuture());
374         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
375
376         final ChannelPromise firstWritePromise = getMockedPromise();
377
378         // intercept listener for first write, so we can invoke successful write later thus simulate pending of the first write
379         final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture = stubAddListener(ioWriteFuture);
380         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
381
382         final ChannelPromise secondWritePromise = getMockedPromise();
383         // now make write throw pending exception
384         doThrow(org.apache.sshd.common.io.WritePendingException.class).when(asyncIn).write(any(Buffer.class));
385         for (int i = 0; i < 1001; i++) {
386             asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
387         }
388
389         verify(secondWritePromise, times(1)).setFailure(any(Throwable.class));
390     }
391
392     @Test
393     public void testDisconnect() throws Exception {
394         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
395
396         final IoInputStream asyncOut = getMockedIoInputStream();
397         final IoOutputStream asyncIn = getMockedIoOutputStream();
398         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
399         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
400         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
401
402         sshConnectListener.operationComplete(connectFuture);
403         sshAuthListener.operationComplete(getSuccessAuthFuture());
404         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
405
406         final ChannelPromise disconnectPromise = getMockedPromise();
407         asyncSshHandler.disconnect(ctx, disconnectPromise);
408
409         verify(sshSession).close(anyBoolean());
410         verify(disconnectPromise).setSuccess();
411         verify(ctx).fireChannelInactive();
412     }
413
414     private OpenFuture getSuccessOpenFuture() {
415         final OpenFuture failedOpenFuture = mock(OpenFuture.class);
416         doReturn(true).when(failedOpenFuture).isOpened();
417         return failedOpenFuture;
418     }
419
420     private AuthFuture getSuccessAuthFuture() {
421         final AuthFuture authFuture = mock(AuthFuture.class);
422         doReturn(true).when(authFuture).isSuccess();
423         return authFuture;
424     }
425
426     private ConnectFuture getSuccessConnectFuture(final ClientSession sshSession) {
427         final ConnectFuture connectFuture = mock(ConnectFuture.class);
428         doReturn(true).when(connectFuture).isConnected();
429
430         doReturn(sshSession).when(connectFuture).getSession();
431         return connectFuture;
432     }
433
434     private ClientSession getMockedSshSession(final ChannelSubsystem subsystemChannel) throws IOException {
435         final ClientSession sshSession = mock(ClientSession.class);
436
437         doReturn("sshSession").when(sshSession).toString();
438         doReturn("serverVersion").when(sshSession).getServerVersion();
439         doReturn(false).when(sshSession).isClosed();
440         doReturn(false).when(sshSession).isClosing();
441         final CloseFuture closeFuture = mock(CloseFuture.class);
442         Futures.addCallback(stubAddListener(closeFuture), new SuccessFutureListener<CloseFuture>() {
443             @Override
444             public void onSuccess(final SshFutureListener<CloseFuture> result) {
445                 doReturn(true).when(closeFuture).isClosed();
446                 result.operationComplete(closeFuture);
447             }
448         });
449         doReturn(closeFuture).when(sshSession).close(false);
450
451         doReturn(subsystemChannel).when(sshSession).createSubsystemChannel(anyString());
452
453         return sshSession;
454     }
455
456     private ChannelSubsystem getMockedSubsystemChannel(final IoInputStream asyncOut, final IoOutputStream asyncIn) throws IOException {
457         final ChannelSubsystem subsystemChannel = mock(ChannelSubsystem.class);
458         doReturn("subsystemChannel").when(subsystemChannel).toString();
459
460         doNothing().when(subsystemChannel).setStreaming(any(ClientChannel.Streaming.class));
461         final OpenFuture openFuture = mock(OpenFuture.class);
462
463         Futures.addCallback(stubAddListener(openFuture), new SuccessFutureListener<OpenFuture>() {
464             @Override
465             public void onSuccess(final SshFutureListener<OpenFuture> result) {
466                 sshChannelOpenListener = result;
467             }
468         });
469
470         doReturn(asyncOut).when(subsystemChannel).getAsyncOut();
471
472         doReturn(openFuture).when(subsystemChannel).open();
473         doReturn(asyncIn).when(subsystemChannel).getAsyncIn();
474         return subsystemChannel;
475     }
476
477     private IoOutputStream getMockedIoOutputStream() {
478         final IoOutputStream mock = mock(IoOutputStream.class);
479         final IoWriteFuture ioWriteFuture = mock(IoWriteFuture.class);
480         doReturn(ioWriteFuture).when(ioWriteFuture).addListener(Matchers.<SshFutureListener<IoWriteFuture>>any());
481         doReturn(true).when(ioWriteFuture).isWritten();
482
483         Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
484             @Override
485             public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
486                 result.operationComplete(ioWriteFuture);
487             }
488         });
489
490         doReturn(ioWriteFuture).when(mock).write(any(Buffer.class));
491         doReturn(false).when(mock).isClosed();
492         doReturn(false).when(mock).isClosing();
493         return mock;
494     }
495
496     private IoInputStream getMockedIoInputStream() {
497         final IoInputStream mock = mock(IoInputStream.class);
498         final IoReadFuture ioReadFuture = mock(IoReadFuture.class);
499         doReturn(null).when(ioReadFuture).getException();
500         doReturn(ioReadFuture).when(ioReadFuture).removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
501         doReturn(5).when(ioReadFuture).getRead();
502         doReturn(new Buffer(new byte[]{0, 1, 2, 3, 4})).when(ioReadFuture).getBuffer();
503         doReturn(ioReadFuture).when(ioReadFuture).addListener(Matchers.<SshFutureListener<IoReadFuture>>any());
504
505         // Always success for read
506         Futures.addCallback(stubAddListener(ioReadFuture), new SuccessFutureListener<IoReadFuture>() {
507             @Override
508             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
509                 result.operationComplete(ioReadFuture);
510             }
511         });
512
513         doReturn(ioReadFuture).when(mock).read(any(Buffer.class));
514         doReturn(false).when(mock).isClosed();
515         doReturn(false).when(mock).isClosing();
516         return mock;
517     }
518
519     @Test
520     public void testConnectFailOpenChannel() throws Exception {
521         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
522
523         final IoInputStream asyncOut = getMockedIoInputStream();
524         final IoOutputStream asyncIn = getMockedIoOutputStream();
525         final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
526         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
527         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
528
529         sshConnectListener.operationComplete(connectFuture);
530
531         sshAuthListener.operationComplete(getSuccessAuthFuture());
532
533         verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
534
535         try {
536             sshChannelOpenListener.operationComplete(getFailedOpenFuture());
537             fail("Exception expected");
538         } catch (final Exception e) {
539             verify(promise).setFailure(any(Throwable.class));
540             verifyNoMoreInteractions(promise);
541             // TODO should ctx.channelInactive be called if we throw exception ?
542         }
543     }
544
545     @Test
546     public void testConnectFailAuth() throws Exception {
547         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
548
549         final ClientSession sshSession = mock(ClientSession.class);
550         doReturn(true).when(sshSession).isClosed();
551         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
552
553         sshConnectListener.operationComplete(connectFuture);
554
555         final AuthFuture authFuture = getFailedAuthFuture();
556
557         try {
558             sshAuthListener.operationComplete(authFuture);
559             fail("Exception expected");
560         } catch (final Exception e) {
561             verify(promise).setFailure(any(Throwable.class));
562             verifyNoMoreInteractions(promise);
563             // TODO should ctx.channelInactive be called ?
564         }
565     }
566
567     private AuthFuture getFailedAuthFuture() {
568         final AuthFuture authFuture = mock(AuthFuture.class);
569         doReturn(false).when(authFuture).isSuccess();
570         doReturn(new IllegalStateException()).when(authFuture).getException();
571         return authFuture;
572     }
573
574     private OpenFuture getFailedOpenFuture() {
575         final OpenFuture authFuture = mock(OpenFuture.class);
576         doReturn(false).when(authFuture).isOpened();
577         doReturn(new IllegalStateException()).when(authFuture).getException();
578         return authFuture;
579     }
580
581     @Test
582     public void testConnectFail() throws Exception {
583         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
584
585         final ConnectFuture connectFuture = getFailedConnectFuture();
586         try {
587             sshConnectListener.operationComplete(connectFuture);
588             fail("Exception expected");
589         } catch (final Exception e) {
590             verify(promise).setFailure(any(Throwable.class));
591             verifyNoMoreInteractions(promise);
592             // TODO should ctx.channelInactive be called ?
593         }
594     }
595
596     private ConnectFuture getFailedConnectFuture() {
597         final ConnectFuture connectFuture = mock(ConnectFuture.class);
598         doReturn(false).when(connectFuture).isConnected();
599         doReturn(new IllegalStateException()).when(connectFuture).getException();
600         return connectFuture;
601     }
602
603     private ChannelPromise getMockedPromise() {
604         final ChannelPromise promise = mock(ChannelPromise.class);
605         doReturn(promise).when(promise).setSuccess();
606         doReturn(promise).when(promise).setFailure(any(Throwable.class));
607         return promise;
608     }
609
610     private static abstract class SuccessFutureListener<T extends SshFuture<T>> implements FutureCallback<SshFutureListener<T>> {
611
612         @Override
613         public abstract void onSuccess(final SshFutureListener<T> result);
614
615         @Override
616         public void onFailure(final Throwable t) {
617             throw new RuntimeException(t);
618         }
619     }
620 }