Add AsyncSshHandler.onConnectComplete()
[netconf.git] / netconf / netconf-netty-util / src / test / java / org / opendaylight / netconf / nettyutil / handler / ssh / client / AsyncSshHandlerTest.java
index dbde8a140d36ebfbbe96d2617136253b69613f7f..0bb84e5a05cf989f1168964efde03934bab09dea 100644 (file)
@@ -5,13 +5,11 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.netconf.nettyutil.handler.ssh.client;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.anyString;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
@@ -20,51 +18,52 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
-import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelConfig;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.DefaultChannelPromise;
 import java.io.IOException;
 import java.net.SocketAddress;
-import org.apache.sshd.ClientChannel;
-import org.apache.sshd.ClientSession;
-import org.apache.sshd.SshClient;
-import org.apache.sshd.client.channel.ChannelSubsystem;
-import org.apache.sshd.client.future.AuthFuture;
-import org.apache.sshd.client.future.ConnectFuture;
-import org.apache.sshd.client.future.OpenFuture;
-import org.apache.sshd.common.future.CloseFuture;
-import org.apache.sshd.common.future.SshFuture;
-import org.apache.sshd.common.future.SshFutureListener;
-import org.apache.sshd.common.io.IoInputStream;
-import org.apache.sshd.common.io.IoOutputStream;
-import org.apache.sshd.common.io.IoReadFuture;
-import org.apache.sshd.common.io.IoWriteFuture;
-import org.apache.sshd.common.util.Buffer;
+import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
-import org.mockito.Matchers;
+import org.junit.runner.RunWith;
 import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
+import org.mockito.junit.MockitoJUnitRunner;
 import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
-
+import org.opendaylight.netconf.shaded.sshd.client.channel.ClientChannel;
+import org.opendaylight.netconf.shaded.sshd.client.future.AuthFuture;
+import org.opendaylight.netconf.shaded.sshd.client.future.ConnectFuture;
+import org.opendaylight.netconf.shaded.sshd.client.future.OpenFuture;
+import org.opendaylight.netconf.shaded.sshd.client.session.ClientSession;
+import org.opendaylight.netconf.shaded.sshd.common.future.CloseFuture;
+import org.opendaylight.netconf.shaded.sshd.common.future.SshFuture;
+import org.opendaylight.netconf.shaded.sshd.common.future.SshFutureListener;
+import org.opendaylight.netconf.shaded.sshd.common.io.IoInputStream;
+import org.opendaylight.netconf.shaded.sshd.common.io.IoOutputStream;
+import org.opendaylight.netconf.shaded.sshd.common.io.IoReadFuture;
+import org.opendaylight.netconf.shaded.sshd.common.io.IoWriteFuture;
+import org.opendaylight.netconf.shaded.sshd.common.io.WritePendingException;
+import org.opendaylight.netconf.shaded.sshd.common.util.buffer.Buffer;
+import org.opendaylight.netconf.shaded.sshd.common.util.buffer.ByteArrayBuffer;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class AsyncSshHandlerTest {
 
     @Mock
-    private SshClient sshClient;
+    private NetconfSshClient sshClient;
     @Mock
     private AuthenticationHandler authHandler;
     @Mock
@@ -75,23 +74,22 @@ public class AsyncSshHandlerTest {
     private SocketAddress remoteAddress;
     @Mock
     private SocketAddress localAddress;
+    @Mock
+    private ChannelConfig channelConfig;
 
     private AsyncSshHandler asyncSshHandler;
 
     private SshFutureListener<ConnectFuture> sshConnectListener;
     private SshFutureListener<AuthFuture> sshAuthListener;
     private SshFutureListener<OpenFuture> sshChannelOpenListener;
-
     private ChannelPromise promise;
 
     @Before
     public void setUp() throws Exception {
-        MockitoAnnotations.initMocks(this);
         stubAuth();
         stubSshClient();
         stubChannel();
         stubCtx();
-        stubRemoteAddress();
 
         promise = getMockedPromise();
 
@@ -116,7 +114,7 @@ public class AsyncSshHandlerTest {
             public void onSuccess(final SshFutureListener<AuthFuture> result) {
                 sshAuthListener = result;
             }
-        });
+        }, MoreExecutors.directExecutor());
         doReturn(authFuture).when(authHandler).authenticate(any(ClientSession.class));
     }
 
@@ -124,26 +122,18 @@ public class AsyncSshHandlerTest {
     private static <T extends SshFuture<T>> ListenableFuture<SshFutureListener<T>> stubAddListener(final T future) {
         final SettableFuture<SshFutureListener<T>> listenerSettableFuture = SettableFuture.create();
 
-        doAnswer(new Answer<Object>() {
-            @Override
-            public Object answer(final InvocationOnMock invocation) throws Throwable {
-                listenerSettableFuture.set((SshFutureListener<T>) invocation.getArguments()[0]);
-                return null;
-            }
+        doAnswer(invocation -> {
+            listenerSettableFuture.set((SshFutureListener<T>) invocation.getArguments()[0]);
+            return null;
         }).when(future).addListener(any(SshFutureListener.class));
 
         return listenerSettableFuture;
     }
 
-    private void stubRemoteAddress() {
-        doReturn("remote").when(remoteAddress).toString();
-    }
-
     private void stubCtx() {
         doReturn(channel).when(ctx).channel();
         doReturn(ctx).when(ctx).fireChannelActive();
         doReturn(ctx).when(ctx).fireChannelInactive();
-        doReturn(ctx).when(ctx).fireChannelRead(anyObject());
         doReturn(mock(ChannelFuture.class)).when(ctx).disconnect(any(ChannelPromise.class));
         doReturn(getMockedPromise()).when(ctx).newPromise();
     }
@@ -152,16 +142,18 @@ public class AsyncSshHandlerTest {
         doReturn("channel").when(channel).toString();
     }
 
-    private void stubSshClient() {
-        doNothing().when(sshClient).start();
+    private void stubSshClient() throws IOException {
         final ConnectFuture connectFuture = mock(ConnectFuture.class);
         Futures.addCallback(stubAddListener(connectFuture), new SuccessFutureListener<ConnectFuture>() {
             @Override
             public void onSuccess(final SshFutureListener<ConnectFuture> result) {
                 sshConnectListener = result;
             }
-        });
+        }, MoreExecutors.directExecutor());
         doReturn(connectFuture).when(sshClient).connect("usr", remoteAddress);
+        doReturn(channelConfig).when(channel).config();
+        doReturn(1).when(channelConfig).getConnectTimeoutMillis();
+        doReturn(connectFuture).when(connectFuture).verify(1,TimeUnit.MILLISECONDS);
     }
 
     @Test
@@ -170,7 +162,7 @@ public class AsyncSshHandlerTest {
 
         final IoInputStream asyncOut = getMockedIoInputStream();
         final IoOutputStream asyncIn = getMockedIoOutputStream();
-        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+        final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
 
@@ -182,80 +174,7 @@ public class AsyncSshHandlerTest {
 
         verify(promise).setSuccess();
         verify(ctx).fireChannelActive();
-    }
-
-    @Test
-    public void testRead() throws Exception {
-        asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
-
-        final IoInputStream asyncOut = getMockedIoInputStream();
-        final IoOutputStream asyncIn = getMockedIoOutputStream();
-        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
-        final ClientSession sshSession = getMockedSshSession(subsystemChannel);
-        final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
-
-        sshConnectListener.operationComplete(connectFuture);
-        sshAuthListener.operationComplete(getSuccessAuthFuture());
-        sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
-
-        verify(ctx).fireChannelRead(any(ByteBuf.class));
-    }
-
-    @Test
-    public void testReadClosed() throws Exception {
-        asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
-
-        final IoInputStream asyncOut = getMockedIoInputStream();
-        final IoReadFuture mockedReadFuture = asyncOut.read(null);
-
-        Futures.addCallback(stubAddListener(mockedReadFuture), new SuccessFutureListener<IoReadFuture>() {
-            @Override
-            public void onSuccess(final SshFutureListener<IoReadFuture> result) {
-                doReturn(new IllegalStateException()).when(mockedReadFuture).getException();
-                doReturn(mockedReadFuture).when(mockedReadFuture).removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
-                doReturn(true).when(asyncOut).isClosing();
-                doReturn(true).when(asyncOut).isClosed();
-                result.operationComplete(mockedReadFuture);
-            }
-        });
-
-        final IoOutputStream asyncIn = getMockedIoOutputStream();
-        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
-        final ClientSession sshSession = getMockedSshSession(subsystemChannel);
-        final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
-
-        sshConnectListener.operationComplete(connectFuture);
-        sshAuthListener.operationComplete(getSuccessAuthFuture());
-        sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
-
-        verify(ctx).fireChannelInactive();
-    }
-
-    @Test
-    public void testReadFail() throws Exception {
-        asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
-
-        final IoInputStream asyncOut = getMockedIoInputStream();
-        final IoReadFuture mockedReadFuture = asyncOut.read(null);
-
-        Futures.addCallback(stubAddListener(mockedReadFuture), new SuccessFutureListener<IoReadFuture>() {
-            @Override
-            public void onSuccess(final SshFutureListener<IoReadFuture> result) {
-                doReturn(new IllegalStateException()).when(mockedReadFuture).getException();
-                doReturn(mockedReadFuture).when(mockedReadFuture).removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
-                result.operationComplete(mockedReadFuture);
-            }
-        });
-
-        final IoOutputStream asyncIn = getMockedIoOutputStream();
-        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
-        final ClientSession sshSession = getMockedSshSession(subsystemChannel);
-        final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
-
-        sshConnectListener.operationComplete(connectFuture);
-        sshAuthListener.operationComplete(getSuccessAuthFuture());
-        sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
-
+        asyncSshHandler.close(ctx, getMockedPromise());
         verify(ctx).fireChannelInactive();
     }
 
@@ -265,7 +184,7 @@ public class AsyncSshHandlerTest {
 
         final IoInputStream asyncOut = getMockedIoInputStream();
         final IoOutputStream asyncIn = getMockedIoOutputStream();
-        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+        final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
 
@@ -286,20 +205,18 @@ public class AsyncSshHandlerTest {
         final IoInputStream asyncOut = getMockedIoInputStream();
         final IoOutputStream asyncIn = getMockedIoOutputStream();
 
-        final IoWriteFuture ioWriteFuture = asyncIn.write(null);
+        final IoWriteFuture ioWriteFuture = asyncIn.writeBuffer(new ByteArrayBuffer());
 
         Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
             @Override
             public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
                 doReturn(false).when(ioWriteFuture).isWritten();
                 doReturn(new IllegalStateException()).when(ioWriteFuture).getException();
-                doReturn(true).when(asyncIn).isClosing();
-                doReturn(true).when(asyncIn).isClosed();
                 result.operationComplete(ioWriteFuture);
             }
-        });
+        }, MoreExecutors.directExecutor());
 
-        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+        final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
 
@@ -319,9 +236,9 @@ public class AsyncSshHandlerTest {
 
         final IoInputStream asyncOut = getMockedIoInputStream();
         final IoOutputStream asyncIn = getMockedIoOutputStream();
-        final IoWriteFuture ioWriteFuture = asyncIn.write(null);
+        final IoWriteFuture ioWriteFuture = asyncIn.writeBuffer(new ByteArrayBuffer());
 
-        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+        final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
 
@@ -331,21 +248,22 @@ public class AsyncSshHandlerTest {
 
         final ChannelPromise firstWritePromise = getMockedPromise();
 
-        // intercept listener for first write, so we can invoke successful write later thus simulate pending of the first write
-        final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture = stubAddListener(ioWriteFuture);
+        // intercept listener for first write,
+        // so we can invoke successful write later thus simulate pending of the first write
+        final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture =
+                stubAddListener(ioWriteFuture);
         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
         final SshFutureListener<IoWriteFuture> firstWriteListener = firstWriteListenerFuture.get();
-        // intercept second listener, this is the listener for pending write for the pending write to know when pending state ended
+        // intercept second listener,
+        // this is the listener for pending write for the pending write to know when pending state ended
         final ListenableFuture<SshFutureListener<IoWriteFuture>> pendingListener = stubAddListener(ioWriteFuture);
 
         final ChannelPromise secondWritePromise = getMockedPromise();
-        // now make write throw pending exception
-        doThrow(org.apache.sshd.common.io.WritePendingException.class).when(asyncIn).write(any(Buffer.class));
         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
 
-        doReturn(ioWriteFuture).when(asyncIn).write(any(Buffer.class));
+        doReturn(ioWriteFuture).when(asyncIn).writeBuffer(any(Buffer.class));
 
-        verifyZeroInteractions(firstWritePromise, secondWritePromise);
+        verifyNoMoreInteractions(firstWritePromise, secondWritePromise);
 
         // make first write stop pending
         firstWriteListener.operationComplete(ioWriteFuture);
@@ -365,9 +283,9 @@ public class AsyncSshHandlerTest {
 
         final IoInputStream asyncOut = getMockedIoInputStream();
         final IoOutputStream asyncIn = getMockedIoOutputStream();
-        final IoWriteFuture ioWriteFuture = asyncIn.write(null);
+        final IoWriteFuture ioWriteFuture = asyncIn.writeBuffer(new ByteArrayBuffer());
 
-        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+        final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
 
@@ -377,13 +295,15 @@ public class AsyncSshHandlerTest {
 
         final ChannelPromise firstWritePromise = getMockedPromise();
 
-        // intercept listener for first write, so we can invoke successful write later thus simulate pending of the first write
-        final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture = stubAddListener(ioWriteFuture);
+        // intercept listener for first write,
+        // so we can invoke successful write later thus simulate pending of the first write
+        final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture =
+                stubAddListener(ioWriteFuture);
         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
 
         final ChannelPromise secondWritePromise = getMockedPromise();
         // now make write throw pending exception
-        doThrow(org.apache.sshd.common.io.WritePendingException.class).when(asyncIn).write(any(Buffer.class));
+        doThrow(WritePendingException.class).when(asyncIn).writeBuffer(any(Buffer.class));
         for (int i = 0; i < 1001; i++) {
             asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
         }
@@ -397,7 +317,7 @@ public class AsyncSshHandlerTest {
 
         final IoInputStream asyncOut = getMockedIoInputStream();
         final IoOutputStream asyncIn = getMockedIoOutputStream();
-        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+        final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
 
@@ -410,7 +330,7 @@ public class AsyncSshHandlerTest {
 
         verify(sshSession).close(anyBoolean());
         verify(disconnectPromise).setSuccess();
-        verify(ctx).fireChannelInactive();
+        //verify(ctx).fireChannelInactive();
     }
 
     private static OpenFuture getSuccessOpenFuture() {
@@ -427,16 +347,16 @@ public class AsyncSshHandlerTest {
 
     private static ConnectFuture getSuccessConnectFuture(final ClientSession sshSession) {
         final ConnectFuture connectFuture = mock(ConnectFuture.class);
-        doReturn(true).when(connectFuture).isConnected();
+        doReturn(null).when(connectFuture).getException();
 
         doReturn(sshSession).when(connectFuture).getSession();
         return connectFuture;
     }
 
-    private static ClientSession getMockedSshSession(final ChannelSubsystem subsystemChannel) throws IOException {
-        final ClientSession sshSession = mock(ClientSession.class);
+    private static NettyAwareClientSession getMockedSshSession(final NettyAwareChannelSubsystem subsystemChannel)
+            throws IOException {
+        final NettyAwareClientSession sshSession = mock(NettyAwareClientSession.class);
 
-        doReturn("sshSession").when(sshSession).toString();
         doReturn("serverVersion").when(sshSession).getServerVersion();
         doReturn(false).when(sshSession).isClosed();
         doReturn(false).when(sshSession).isClosing();
@@ -447,17 +367,17 @@ public class AsyncSshHandlerTest {
                 doReturn(true).when(closeFuture).isClosed();
                 result.operationComplete(closeFuture);
             }
-        });
+        }, MoreExecutors.directExecutor());
         doReturn(closeFuture).when(sshSession).close(false);
 
-        doReturn(subsystemChannel).when(sshSession).createSubsystemChannel(anyString());
+        doReturn(subsystemChannel).when(sshSession).createSubsystemChannel(eq("netconf"), any());
 
         return sshSession;
     }
 
-    private ChannelSubsystem getMockedSubsystemChannel(final IoInputStream asyncOut, final IoOutputStream asyncIn) throws IOException {
-        final ChannelSubsystem subsystemChannel = mock(ChannelSubsystem.class);
-        doReturn("subsystemChannel").when(subsystemChannel).toString();
+    private NettyAwareChannelSubsystem getMockedSubsystemChannel(final IoInputStream asyncOut,
+                                                       final IoOutputStream asyncIn) throws IOException {
+        final NettyAwareChannelSubsystem subsystemChannel = mock(NettyAwareChannelSubsystem.class);
 
         doNothing().when(subsystemChannel).setStreaming(any(ClientChannel.Streaming.class));
         final OpenFuture openFuture = mock(OpenFuture.class);
@@ -467,19 +387,18 @@ public class AsyncSshHandlerTest {
             public void onSuccess(final SshFutureListener<OpenFuture> result) {
                 sshChannelOpenListener = result;
             }
-        });
-
-        doReturn(asyncOut).when(subsystemChannel).getAsyncOut();
+        }, MoreExecutors.directExecutor());
 
         doReturn(openFuture).when(subsystemChannel).open();
         doReturn(asyncIn).when(subsystemChannel).getAsyncIn();
+        doNothing().when(subsystemChannel).onClose(any());
+        doNothing().when(subsystemChannel).close();
         return subsystemChannel;
     }
 
-    private static IoOutputStream getMockedIoOutputStream() {
+    private static IoOutputStream getMockedIoOutputStream() throws IOException {
         final IoOutputStream mock = mock(IoOutputStream.class);
         final IoWriteFuture ioWriteFuture = mock(IoWriteFuture.class);
-        doReturn(ioWriteFuture).when(ioWriteFuture).addListener(Matchers.<SshFutureListener<IoWriteFuture>>any());
         doReturn(true).when(ioWriteFuture).isWritten();
 
         Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
@@ -487,9 +406,9 @@ public class AsyncSshHandlerTest {
             public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
                 result.operationComplete(ioWriteFuture);
             }
-        });
+        }, MoreExecutors.directExecutor());
 
-        doReturn(ioWriteFuture).when(mock).write(any(Buffer.class));
+        doReturn(ioWriteFuture).when(mock).writeBuffer(any(Buffer.class));
         doReturn(false).when(mock).isClosed();
         doReturn(false).when(mock).isClosing();
         return mock;
@@ -498,23 +417,13 @@ public class AsyncSshHandlerTest {
     private static IoInputStream getMockedIoInputStream() {
         final IoInputStream mock = mock(IoInputStream.class);
         final IoReadFuture ioReadFuture = mock(IoReadFuture.class);
-        doReturn(null).when(ioReadFuture).getException();
-        doReturn(ioReadFuture).when(ioReadFuture).removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
-        doReturn(5).when(ioReadFuture).getRead();
-        doReturn(new Buffer(new byte[]{0, 1, 2, 3, 4})).when(ioReadFuture).getBuffer();
-        doReturn(ioReadFuture).when(ioReadFuture).addListener(Matchers.<SshFutureListener<IoReadFuture>>any());
-
         // Always success for read
         Futures.addCallback(stubAddListener(ioReadFuture), new SuccessFutureListener<IoReadFuture>() {
             @Override
             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
                 result.operationComplete(ioReadFuture);
             }
-        });
-
-        doReturn(ioReadFuture).when(mock).read(any(Buffer.class));
-        doReturn(false).when(mock).isClosed();
-        doReturn(false).when(mock).isClosing();
+        }, MoreExecutors.directExecutor());
         return mock;
     }
 
@@ -524,7 +433,7 @@ public class AsyncSshHandlerTest {
 
         final IoInputStream asyncOut = getMockedIoInputStream();
         final IoOutputStream asyncIn = getMockedIoOutputStream();
-        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+        final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
 
@@ -542,7 +451,7 @@ public class AsyncSshHandlerTest {
     public void testConnectFailAuth() throws Exception {
         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
 
-        final ClientSession sshSession = mock(ClientSession.class);
+        final NettyAwareClientSession sshSession = mock(NettyAwareClientSession.class);
         doReturn(true).when(sshSession).isClosed();
         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
 
@@ -552,6 +461,8 @@ public class AsyncSshHandlerTest {
 
         sshAuthListener.operationComplete(authFuture);
         verify(promise).setFailure(any(Throwable.class));
+        asyncSshHandler.close(ctx, getMockedPromise());
+        verify(ctx, times(0)).fireChannelInactive();
     }
 
     private static AuthFuture getFailedAuthFuture() {
@@ -579,7 +490,6 @@ public class AsyncSshHandlerTest {
 
     private static ConnectFuture getFailedConnectFuture() {
         final ConnectFuture connectFuture = mock(ConnectFuture.class);
-        doReturn(false).when(connectFuture).isConnected();
         doReturn(new IllegalStateException()).when(connectFuture).getException();
         return connectFuture;
     }
@@ -588,14 +498,15 @@ public class AsyncSshHandlerTest {
         return spy(new DefaultChannelPromise(channel));
     }
 
-    private static abstract class SuccessFutureListener<T extends SshFuture<T>> implements FutureCallback<SshFutureListener<T>> {
+    private abstract static class SuccessFutureListener<T extends SshFuture<T>>
+            implements FutureCallback<SshFutureListener<T>> {
 
         @Override
-        public abstract void onSuccess(final SshFutureListener<T> result);
+        public abstract void onSuccess(SshFutureListener<T> result);
 
         @Override
-        public void onFailure(final Throwable t) {
-            throw new RuntimeException(t);
+        public void onFailure(final Throwable throwable) {
+            throw new RuntimeException(throwable);
         }
     }
 }