BUG-1949 Fix race condition in AsyncSshHandler 29/11329/3
authorMaros Marsalek <mmarsale@cisco.com>
Thu, 18 Sep 2014 15:58:48 +0000 (17:58 +0200)
committerMaros Marsalek <mmarsale@cisco.com>
Fri, 19 Sep 2014 15:11:47 +0000 (17:11 +0200)
Remove pending limit due to unexpected behaviour with chunked messages.
Extract Reader/Writer into separate classes.

Also lower the amount of requests sent in SSH Stress integration test.

Change-Id: Idff719ac3a6bed9e8939efa01b8306f2884848fe
Signed-off-by: Maros Marsalek <mmarsale@cisco.com>
opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITSecureTest.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHanderReader.java [new file with mode: 0644]
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java [new file with mode: 0644]
opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerTest.java

index 25251a9..1adcd7e 100644 (file)
@@ -107,10 +107,9 @@ public class NetconfITSecureTest extends AbstractNetconfConfigTest {
     /**
      * Test all requests are handled properly and no mismatch occurs in listener
      */
-    /* Disabled until fixed
-    @Test(timeout = 5*60*1000)
+    @Test(timeout = 6*60*1000)
     public void testSecureStress() throws Exception {
-        final int requests = 10000;
+        final int requests = 4000;
 
         final NetconfClientDispatcher dispatch = new NetconfClientDispatcherImpl(getNettyThreadgroup(), getNettyThreadgroup(), getHashedWheelTimer());
         final NetconfDeviceCommunicator sessionListener = getSessionListener();
@@ -152,7 +151,7 @@ public class NetconfITSecureTest extends AbstractNetconfConfigTest {
             assertEquals(requests, responseCounter.get());
         }
     }
-    */
+
     private NetconfMessage changeMessageId(final NetconfMessage getConfig, final int i) throws IOException, SAXException {
         String s = XmlUtil.toString(getConfig.getDocument(), false);
         s = s.replace("101", Integer.toString(i));
diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHanderReader.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHanderReader.java
new file mode 100644 (file)
index 0000000..73a24f2
--- /dev/null
@@ -0,0 +1,87 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandler;
+import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.io.IoInputStream;
+import org.apache.sshd.common.io.IoReadFuture;
+import org.apache.sshd.common.util.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Listener on async input stream from SSH session.
+ * This listeners schedules reads in a loop until the session is closed or read fails.
+ */
+final class AsyncSshHanderReader implements SshFutureListener<IoReadFuture>, AutoCloseable {
+
+    private static final Logger logger = LoggerFactory.getLogger(AsyncSshHandler.class);
+
+    private static final int BUFFER_SIZE = 8192;
+
+    private final ChannelOutboundHandler asyncSshHandler;
+    private final ChannelHandlerContext ctx;
+
+    private IoInputStream asyncOut;
+    private Buffer buf;
+    private IoReadFuture currentReadFuture;
+
+    public AsyncSshHanderReader(final ChannelOutboundHandler asyncSshHandler, final ChannelHandlerContext ctx, final IoInputStream asyncOut) {
+        this.asyncSshHandler = asyncSshHandler;
+        this.ctx = ctx;
+        this.asyncOut = asyncOut;
+        buf = new Buffer(BUFFER_SIZE);
+        asyncOut.read(buf).addListener(this);
+    }
+
+    @Override
+    public synchronized void operationComplete(final IoReadFuture future) {
+        if(future.getException() != null) {
+            if(asyncOut.isClosed() || asyncOut.isClosing()) {
+                // Ssh dropped
+                logger.debug("Ssh session dropped on channel: {}", ctx.channel(), future.getException());
+            } else {
+                logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException());
+            }
+            invokeDisconnect();
+            return;
+        }
+
+        if (future.getRead() > 0) {
+            ctx.fireChannelRead(Unpooled.wrappedBuffer(buf.array(), 0, future.getRead()));
+
+            // Schedule next read
+            buf = new Buffer(BUFFER_SIZE);
+            currentReadFuture = asyncOut.read(buf);
+            currentReadFuture.addListener(this);
+        }
+    }
+
+    private void invokeDisconnect() {
+        try {
+            asyncSshHandler.disconnect(ctx, ctx.newPromise());
+        } catch (final Exception e) {
+            // This should not happen
+            throw new IllegalStateException(e);
+        }
+    }
+
+    @Override
+    public synchronized void close() {
+        // Remove self as listener on close to prevent reading from closed input
+        if(currentReadFuture != null) {
+            currentReadFuture.removeListener(this);
+        }
+
+        asyncOut = null;
+    }
+}
index 3d1e478..3bd7232 100644 (file)
@@ -9,10 +9,7 @@
 package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
 
 import com.google.common.base.Preconditions;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelOutboundHandler;
 import io.netty.channel.ChannelOutboundHandlerAdapter;
 import io.netty.channel.ChannelPromise;
 import java.io.IOException;
@@ -25,12 +22,6 @@ import org.apache.sshd.client.future.ConnectFuture;
 import org.apache.sshd.client.future.OpenFuture;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.SshFutureListener;
-import org.apache.sshd.common.io.IoInputStream;
-import org.apache.sshd.common.io.IoOutputStream;
-import org.apache.sshd.common.io.IoReadFuture;
-import org.apache.sshd.common.io.IoWriteFuture;
-import org.apache.sshd.common.io.WritePendingException;
-import org.apache.sshd.common.util.Buffer;
 import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,8 +47,8 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
     private final AuthenticationHandler authenticationHandler;
     private final SshClient sshClient;
 
-    private SshReadAsyncListener sshReadAsyncListener;
-    private SshWriteAsyncHandler sshWriteAsyncHandler;
+    private AsyncSshHanderReader sshReadAsyncListener;
+    private AsyncSshHandlerWriter sshWriteAsyncHandler;
 
     private ClientChannel channel;
     private ClientSession session;
@@ -147,10 +138,10 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
         connectPromise.setSuccess();
         connectPromise = null;
 
-        sshReadAsyncListener = new SshReadAsyncListener(this, ctx, channel.getAsyncOut());
+        sshReadAsyncListener = new AsyncSshHanderReader(this, ctx, channel.getAsyncOut());
         // if readAsyncListener receives immediate close, it will close this handler and closing this handler sets channel variable to null
         if(channel != null) {
-            sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn());
+            sshWriteAsyncHandler = new AsyncSshHandlerWriter(channel.getAsyncIn());
             ctx.fireChannelActive();
         }
     }
@@ -207,178 +198,4 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
         ctx.fireChannelInactive();
     }
 
-    /**
-     * Listener over async input stream from SSH session.
-     * This listeners schedules reads in a loop until the session is closed or read fails.
-     */
-    private static class SshReadAsyncListener implements SshFutureListener<IoReadFuture>, AutoCloseable {
-        private static final int BUFFER_SIZE = 8192;
-
-        private final ChannelOutboundHandler asyncSshHandler;
-        private final ChannelHandlerContext ctx;
-
-        private IoInputStream asyncOut;
-        private Buffer buf;
-        private IoReadFuture currentReadFuture;
-
-        public SshReadAsyncListener(final ChannelOutboundHandler asyncSshHandler, final ChannelHandlerContext ctx, final IoInputStream asyncOut) {
-            this.asyncSshHandler = asyncSshHandler;
-            this.ctx = ctx;
-            this.asyncOut = asyncOut;
-            buf = new Buffer(BUFFER_SIZE);
-            asyncOut.read(buf).addListener(this);
-        }
-
-        @Override
-        public synchronized void operationComplete(final IoReadFuture future) {
-            if(future.getException() != null) {
-                if(asyncOut.isClosed() || asyncOut.isClosing()) {
-                    // Ssh dropped
-                    logger.debug("Ssh session dropped on channel: {}", ctx.channel(), future.getException());
-                } else {
-                    logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException());
-                }
-                invokeDisconnect();
-                return;
-            }
-
-            if (future.getRead() > 0) {
-                ctx.fireChannelRead(Unpooled.wrappedBuffer(buf.array(), 0, future.getRead()));
-
-                // Schedule next read
-                buf = new Buffer(BUFFER_SIZE);
-                currentReadFuture = asyncOut.read(buf);
-                currentReadFuture.addListener(this);
-            }
-        }
-
-        private void invokeDisconnect() {
-            try {
-                asyncSshHandler.disconnect(ctx, ctx.newPromise());
-            } catch (final Exception e) {
-                // This should not happen
-                throw new IllegalStateException(e);
-            }
-        }
-
-        @Override
-        public synchronized void close() {
-            // Remove self as listener on close to prevent reading from closed input
-            if(currentReadFuture != null) {
-                currentReadFuture.removeListener(this);
-            }
-
-            asyncOut = null;
-        }
-    }
-
-    private static final class SshWriteAsyncHandler implements AutoCloseable {
-        public static final int MAX_PENDING_WRITES = 100;
-
-        private final ChannelOutboundHandler channelHandler;
-        private IoOutputStream asyncIn;
-
-        // Counter that holds the amount of pending write messages
-        // Pending write can occur in case remote window is full
-        // In such case, we need to wait for the pending write to finish
-        private int pendingWriteCounter;
-        // Last write future, that can be pending
-        private IoWriteFuture lastWriteFuture;
-
-        public SshWriteAsyncHandler(final ChannelOutboundHandler channelHandler, final IoOutputStream asyncIn) {
-            this.channelHandler = channelHandler;
-            this.asyncIn = asyncIn;
-        }
-
-        int c = 0;
-
-        public synchronized void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
-            try {
-                if(asyncIn == null || asyncIn.isClosed() || asyncIn.isClosing()) {
-                    // If we are closed/closing, set immediate fail
-                    promise.setFailure(new IllegalStateException("Channel closed"));
-                } else {
-                    lastWriteFuture = asyncIn.write(toBuffer(msg));
-                    lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
-
-                        @Override
-                        public void operationComplete(final IoWriteFuture future) {
-                            ((ByteBuf) msg).release();
-
-                            // Notify success or failure
-                            if (future.isWritten()) {
-                                promise.setSuccess();
-                            } else {
-                                promise.setFailure(future.getException());
-                            }
-
-                            // Reset last pending future
-                            synchronized (SshWriteAsyncHandler.this) {
-                                lastWriteFuture = null;
-                            }
-                        }
-                    });
-                }
-            } catch (final WritePendingException e) {
-                // Check limit for pending writes
-                pendingWriteCounter++;
-                if(pendingWriteCounter > MAX_PENDING_WRITES) {
-                    promise.setFailure(e);
-                    handlePendingFailed(ctx, new IllegalStateException("Too much pending writes(" + MAX_PENDING_WRITES + ") on channel: " + ctx.channel() +
-                            ", remote window is not getting read or is too small"));
-                }
-
-                // We need to reset buffer read index, since we've already read it when we tried to write it the first time
-                ((ByteBuf) msg).resetReaderIndex();
-                logger.debug("Write pending to SSH remote on channel: {}, current pending count: {}", ctx.channel(), pendingWriteCounter);
-
-                // In case of pending, re-invoke write after pending is finished
-                Preconditions.checkNotNull(lastWriteFuture, "Write is pending, but there was no previous write attempt", e);
-                lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
-                    @Override
-                    public void operationComplete(final IoWriteFuture future) {
-                        // FIXME possible minor race condition, we cannot guarantee that this callback when pending is finished will be executed first
-                        // External thread could trigger write on this instance while we are on this line
-                        // Verify
-                        if (future.isWritten()) {
-                            synchronized (SshWriteAsyncHandler.this) {
-                                // Pending done, decrease counter
-                                pendingWriteCounter--;
-                                write(ctx, msg, promise);
-                            }
-                        } else {
-                            // Cannot reschedule pending, fail
-                            handlePendingFailed(ctx, e);
-                        }
-                    }
-
-                });
-            }
-        }
-
-        private void handlePendingFailed(final ChannelHandlerContext ctx, final Exception e) {
-            logger.warn("Exception while writing to SSH remote on channel {}", ctx.channel(), e);
-            try {
-                channelHandler.disconnect(ctx, ctx.newPromise());
-            } catch (final Exception ex) {
-                // This should not happen
-                throw new IllegalStateException(ex);
-            }
-        }
-
-        @Override
-        public void close() {
-            asyncIn = null;
-        }
-
-        private Buffer toBuffer(final Object msg) {
-            // TODO Buffer vs ByteBuf translate, Can we handle that better ?
-            Preconditions.checkState(msg instanceof ByteBuf);
-            final ByteBuf byteBuf = (ByteBuf) msg;
-            final byte[] temp = new byte[byteBuf.readableBytes()];
-            byteBuf.readBytes(temp, 0, byteBuf.readableBytes());
-            return new Buffer(temp);
-        }
-
-    }
 }
diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java
new file mode 100644 (file)
index 0000000..eace0ac
--- /dev/null
@@ -0,0 +1,173 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.Queue;
+import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.io.IoOutputStream;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.io.WritePendingException;
+import org.apache.sshd.common.util.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Async Ssh writer. Takes messages(byte arrays) and sends them encrypted to remote server.
+ * Also handles pending writes by caching requests until pending state is over.
+ */
+final class AsyncSshHandlerWriter implements AutoCloseable {
+
+    private static final Logger logger = LoggerFactory
+            .getLogger(AsyncSshHandlerWriter.class);
+
+    // public static final int MAX_PENDING_WRITES = 1000;
+    // TODO implement Limiting mechanism for pending writes
+    // But there is a possible issue with limiting:
+    // 1. What to do when queue is full ? Immediate Fail for every request ?
+    // 2. At this level we might be dealing with Chunks of messages(not whole messages) and unexpected behavior might occur
+    // when we send/queue 1 chunk and fail the other chunks
+
+    private IoOutputStream asyncIn;
+
+    // Order has to be preserved for queued writes
+    private final Deque<PendingWriteRequest> pending = new LinkedList<>();
+
+    public AsyncSshHandlerWriter(final IoOutputStream asyncIn) {
+        this.asyncIn = asyncIn;
+    }
+
+    public synchronized void write(final ChannelHandlerContext ctx,
+            final Object msg, final ChannelPromise promise) {
+        // TODO check for isClosed, isClosing might be performed by mina SSH internally and is not required here
+        // If we are closed/closing, set immediate fail
+        if (asyncIn == null || asyncIn.isClosed() || asyncIn.isClosing()) {
+            promise.setFailure(new IllegalStateException("Channel closed"));
+        } else {
+            final ByteBuf byteBufMsg = (ByteBuf) msg;
+            if (pending.isEmpty() == false) {
+                queueRequest(ctx, byteBufMsg, promise);
+                return;
+            }
+
+            writeWithPendingDetection(ctx, promise, byteBufMsg);
+        }
+    }
+
+    private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise, final ByteBuf byteBufMsg) {
+        try {
+            if (logger.isTraceEnabled()) {
+                logger.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg));
+            }
+            asyncIn.write(toBuffer(byteBufMsg)).addListener(new SshFutureListener<IoWriteFuture>() {
+
+                        @Override
+                        public void operationComplete(final IoWriteFuture future) {
+                            if (logger.isTraceEnabled()) {
+                                logger.trace("Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}",
+                                        ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg));
+                            }
+
+                            // Notify success or failure
+                            if (future.isWritten()) {
+                                promise.setSuccess();
+                            } else {
+                                logger.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(), byteBufToString(byteBufMsg), future.getException());
+                                promise.setFailure(future.getException());
+                            }
+
+                            // Not needed anymore, release
+                            byteBufMsg.release();
+
+                            // Check pending queue and schedule next
+                            // At this time we are guaranteed that we are not in pending state anymore so the next request should succeed
+                            writePendingIfAny();
+                        }
+                    });
+        } catch (final WritePendingException e) {
+            queueRequest(ctx, byteBufMsg, promise);
+        }
+    }
+
+    private synchronized void writePendingIfAny() {
+        if (pending.peek() == null) {
+            return;
+        }
+
+        // In case of pending, reschedule next message from queue
+        final PendingWriteRequest pendingWrite = pending.poll();
+        final ByteBuf msg = pendingWrite.msg;
+        if (logger.isTraceEnabled()) {
+            logger.trace("Writing pending request on channel: {}, message: {}", pendingWrite.ctx.channel(), byteBufToString(msg));
+        }
+
+        writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg);
+    }
+
+    private static String byteBufToString(final ByteBuf msg) {
+        msg.resetReaderIndex();
+        final String s = msg.toString(Charsets.UTF_8);
+        msg.resetReaderIndex();
+        return s;
+    }
+
+    private void queueRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) {
+//        try {
+        logger.debug("Write pending on channel: {}, queueing, current queue size: {}", ctx.channel(), pending.size());
+        if (logger.isTraceEnabled()) {
+            logger.trace("Queueing request due to pending: {}", byteBufToString(msg));
+        }
+        new PendingWriteRequest(ctx, msg, promise).pend(pending);
+//        } catch (final Exception ex) {
+//            logger.warn("Unable to queue write request on channel: {}. Setting fail for the request: {}", ctx.channel(), ex, byteBufToString(msg));
+//            msg.release();
+//            promise.setFailure(ex);
+//        }
+    }
+
+    @Override
+    public synchronized void close() {
+        asyncIn = null;
+    }
+
+    private Buffer toBuffer(final ByteBuf msg) {
+        // TODO Buffer vs ByteBuf translate, Can we handle that better ?
+        final byte[] temp = new byte[msg.readableBytes()];
+        msg.readBytes(temp, 0, msg.readableBytes());
+        return new Buffer(temp);
+    }
+
+    private static final class PendingWriteRequest {
+        private final ChannelHandlerContext ctx;
+        private final ByteBuf msg;
+        private final ChannelPromise promise;
+
+        public PendingWriteRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) {
+            this.ctx = ctx;
+            // Reset reader index, last write (failed) attempt moved index to the end
+            msg.resetReaderIndex();
+            this.msg = msg;
+            this.promise = promise;
+        }
+
+        public void pend(final Queue<PendingWriteRequest> pending) {
+            // Preconditions.checkState(pending.size() < MAX_PENDING_WRITES,
+            // "Too much pending writes(%s) on channel: %s, remote window is not getting read or is too small",
+            // pending.size(), ctx.channel());
+            Preconditions.checkState(pending.offer(this), "Cannot pend another request write (pending count: %s) on channel: %s",
+                    pending.size(), ctx.channel());
+        }
+    }
+}
index 223f2c7..d0fc43d 100644 (file)
@@ -23,12 +23,9 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.verifyZeroInteractions;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 import java.io.IOException;
 import java.net.SocketAddress;
 
-import java.nio.channels.WritePendingException;
 import org.apache.sshd.ClientChannel;
 import org.apache.sshd.ClientSession;
 import org.apache.sshd.SshClient;
@@ -46,6 +43,7 @@ import org.apache.sshd.common.io.IoWriteFuture;
 import org.apache.sshd.common.util.Buffer;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.Matchers;
 import org.mockito.Mock;
@@ -59,6 +57,8 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
@@ -351,19 +351,16 @@ public class AsyncSshHandlerTest {
 
         // make first write stop pending
         firstWriteListener.operationComplete(ioWriteFuture);
-        // intercept third listener, this is regular listener for second write to determine success or failure
-        final ListenableFuture<SshFutureListener<IoWriteFuture>> afterPendingListener = stubAddListener(ioWriteFuture);
 
         // notify listener for second write that pending has ended
         pendingListener.get().operationComplete(ioWriteFuture);
-        // Notify third listener (regular listener for second write) that second write succeeded
-        afterPendingListener.get().operationComplete(ioWriteFuture);
 
         // verify both write promises successful
         verify(firstWritePromise).setSuccess();
         verify(secondWritePromise).setSuccess();
     }
 
+    @Ignore("Pending queue is not limited")
     @Test
     public void testWritePendingMax() throws Exception {
         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
@@ -389,11 +386,11 @@ public class AsyncSshHandlerTest {
         final ChannelPromise secondWritePromise = getMockedPromise();
         // now make write throw pending exception
         doThrow(org.apache.sshd.common.io.WritePendingException.class).when(asyncIn).write(any(Buffer.class));
-        for (int i = 0; i < 1000; i++) {
+        for (int i = 0; i < 1001; i++) {
             asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
         }
 
-        verify(ctx).fireChannelInactive();
+        verify(secondWritePromise, times(1)).setFailure(any(Throwable.class));
     }
 
     @Test