From 9a510c105432cb904cdbcdc63dc38e37a2a70e81 Mon Sep 17 00:00:00 2001 From: Maros Marsalek Date: Thu, 18 Sep 2014 17:58:48 +0200 Subject: [PATCH] BUG-1949 Fix race condition in AsyncSshHandler Remove pending limit due to unexpected behaviour with chunked messages. Extract Reader/Writer into separate classes. Also lower the amount of requests sent in SSH Stress integration test. Change-Id: Idff719ac3a6bed9e8939efa01b8306f2884848fe Signed-off-by: Maros Marsalek --- .../netconf/it/NetconfITSecureTest.java | 7 +- .../ssh/client/AsyncSshHanderReader.java | 87 ++++++++ .../handler/ssh/client/AsyncSshHandler.java | 191 +----------------- .../ssh/client/AsyncSshHandlerWriter.java | 173 ++++++++++++++++ .../ssh/client/AsyncSshHandlerTest.java | 15 +- 5 files changed, 273 insertions(+), 200 deletions(-) create mode 100644 opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHanderReader.java create mode 100644 opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java diff --git a/opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITSecureTest.java b/opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITSecureTest.java index 25251a9867..1adcd7e491 100644 --- a/opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITSecureTest.java +++ b/opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITSecureTest.java @@ -107,10 +107,9 @@ public class NetconfITSecureTest extends AbstractNetconfConfigTest { /** * Test all requests are handled properly and no mismatch occurs in listener */ - /* Disabled until fixed - @Test(timeout = 5*60*1000) + @Test(timeout = 6*60*1000) public void testSecureStress() throws Exception { - final int requests = 10000; + final int requests = 4000; final NetconfClientDispatcher dispatch = new NetconfClientDispatcherImpl(getNettyThreadgroup(), getNettyThreadgroup(), getHashedWheelTimer()); final NetconfDeviceCommunicator sessionListener = getSessionListener(); @@ -152,7 +151,7 @@ public class NetconfITSecureTest extends AbstractNetconfConfigTest { assertEquals(requests, responseCounter.get()); } } - */ + private NetconfMessage changeMessageId(final NetconfMessage getConfig, final int i) throws IOException, SAXException { String s = XmlUtil.toString(getConfig.getDocument(), false); s = s.replace("101", Integer.toString(i)); diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHanderReader.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHanderReader.java new file mode 100644 index 0000000000..73a24f27b2 --- /dev/null +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHanderReader.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client; + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandler; +import org.apache.sshd.common.future.SshFutureListener; +import org.apache.sshd.common.io.IoInputStream; +import org.apache.sshd.common.io.IoReadFuture; +import org.apache.sshd.common.util.Buffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Listener on async input stream from SSH session. + * This listeners schedules reads in a loop until the session is closed or read fails. + */ +final class AsyncSshHanderReader implements SshFutureListener, AutoCloseable { + + private static final Logger logger = LoggerFactory.getLogger(AsyncSshHandler.class); + + private static final int BUFFER_SIZE = 8192; + + private final ChannelOutboundHandler asyncSshHandler; + private final ChannelHandlerContext ctx; + + private IoInputStream asyncOut; + private Buffer buf; + private IoReadFuture currentReadFuture; + + public AsyncSshHanderReader(final ChannelOutboundHandler asyncSshHandler, final ChannelHandlerContext ctx, final IoInputStream asyncOut) { + this.asyncSshHandler = asyncSshHandler; + this.ctx = ctx; + this.asyncOut = asyncOut; + buf = new Buffer(BUFFER_SIZE); + asyncOut.read(buf).addListener(this); + } + + @Override + public synchronized void operationComplete(final IoReadFuture future) { + if(future.getException() != null) { + if(asyncOut.isClosed() || asyncOut.isClosing()) { + // Ssh dropped + logger.debug("Ssh session dropped on channel: {}", ctx.channel(), future.getException()); + } else { + logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException()); + } + invokeDisconnect(); + return; + } + + if (future.getRead() > 0) { + ctx.fireChannelRead(Unpooled.wrappedBuffer(buf.array(), 0, future.getRead())); + + // Schedule next read + buf = new Buffer(BUFFER_SIZE); + currentReadFuture = asyncOut.read(buf); + currentReadFuture.addListener(this); + } + } + + private void invokeDisconnect() { + try { + asyncSshHandler.disconnect(ctx, ctx.newPromise()); + } catch (final Exception e) { + // This should not happen + throw new IllegalStateException(e); + } + } + + @Override + public synchronized void close() { + // Remove self as listener on close to prevent reading from closed input + if(currentReadFuture != null) { + currentReadFuture.removeListener(this); + } + + asyncOut = null; + } +} diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java index 3d1e4784f2..3bd7232023 100644 --- a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java @@ -9,10 +9,7 @@ package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client; import com.google.common.base.Preconditions; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelOutboundHandler; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; import java.io.IOException; @@ -25,12 +22,6 @@ import org.apache.sshd.client.future.ConnectFuture; import org.apache.sshd.client.future.OpenFuture; import org.apache.sshd.common.future.CloseFuture; import org.apache.sshd.common.future.SshFutureListener; -import org.apache.sshd.common.io.IoInputStream; -import org.apache.sshd.common.io.IoOutputStream; -import org.apache.sshd.common.io.IoReadFuture; -import org.apache.sshd.common.io.IoWriteFuture; -import org.apache.sshd.common.io.WritePendingException; -import org.apache.sshd.common.util.Buffer; import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,8 +47,8 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { private final AuthenticationHandler authenticationHandler; private final SshClient sshClient; - private SshReadAsyncListener sshReadAsyncListener; - private SshWriteAsyncHandler sshWriteAsyncHandler; + private AsyncSshHanderReader sshReadAsyncListener; + private AsyncSshHandlerWriter sshWriteAsyncHandler; private ClientChannel channel; private ClientSession session; @@ -147,10 +138,10 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { connectPromise.setSuccess(); connectPromise = null; - sshReadAsyncListener = new SshReadAsyncListener(this, ctx, channel.getAsyncOut()); + sshReadAsyncListener = new AsyncSshHanderReader(this, ctx, channel.getAsyncOut()); // if readAsyncListener receives immediate close, it will close this handler and closing this handler sets channel variable to null if(channel != null) { - sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn()); + sshWriteAsyncHandler = new AsyncSshHandlerWriter(channel.getAsyncIn()); ctx.fireChannelActive(); } } @@ -207,178 +198,4 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { ctx.fireChannelInactive(); } - /** - * Listener over async input stream from SSH session. - * This listeners schedules reads in a loop until the session is closed or read fails. - */ - private static class SshReadAsyncListener implements SshFutureListener, AutoCloseable { - private static final int BUFFER_SIZE = 8192; - - private final ChannelOutboundHandler asyncSshHandler; - private final ChannelHandlerContext ctx; - - private IoInputStream asyncOut; - private Buffer buf; - private IoReadFuture currentReadFuture; - - public SshReadAsyncListener(final ChannelOutboundHandler asyncSshHandler, final ChannelHandlerContext ctx, final IoInputStream asyncOut) { - this.asyncSshHandler = asyncSshHandler; - this.ctx = ctx; - this.asyncOut = asyncOut; - buf = new Buffer(BUFFER_SIZE); - asyncOut.read(buf).addListener(this); - } - - @Override - public synchronized void operationComplete(final IoReadFuture future) { - if(future.getException() != null) { - if(asyncOut.isClosed() || asyncOut.isClosing()) { - // Ssh dropped - logger.debug("Ssh session dropped on channel: {}", ctx.channel(), future.getException()); - } else { - logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException()); - } - invokeDisconnect(); - return; - } - - if (future.getRead() > 0) { - ctx.fireChannelRead(Unpooled.wrappedBuffer(buf.array(), 0, future.getRead())); - - // Schedule next read - buf = new Buffer(BUFFER_SIZE); - currentReadFuture = asyncOut.read(buf); - currentReadFuture.addListener(this); - } - } - - private void invokeDisconnect() { - try { - asyncSshHandler.disconnect(ctx, ctx.newPromise()); - } catch (final Exception e) { - // This should not happen - throw new IllegalStateException(e); - } - } - - @Override - public synchronized void close() { - // Remove self as listener on close to prevent reading from closed input - if(currentReadFuture != null) { - currentReadFuture.removeListener(this); - } - - asyncOut = null; - } - } - - private static final class SshWriteAsyncHandler implements AutoCloseable { - public static final int MAX_PENDING_WRITES = 100; - - private final ChannelOutboundHandler channelHandler; - private IoOutputStream asyncIn; - - // Counter that holds the amount of pending write messages - // Pending write can occur in case remote window is full - // In such case, we need to wait for the pending write to finish - private int pendingWriteCounter; - // Last write future, that can be pending - private IoWriteFuture lastWriteFuture; - - public SshWriteAsyncHandler(final ChannelOutboundHandler channelHandler, final IoOutputStream asyncIn) { - this.channelHandler = channelHandler; - this.asyncIn = asyncIn; - } - - int c = 0; - - public synchronized void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) { - try { - if(asyncIn == null || asyncIn.isClosed() || asyncIn.isClosing()) { - // If we are closed/closing, set immediate fail - promise.setFailure(new IllegalStateException("Channel closed")); - } else { - lastWriteFuture = asyncIn.write(toBuffer(msg)); - lastWriteFuture.addListener(new SshFutureListener() { - - @Override - public void operationComplete(final IoWriteFuture future) { - ((ByteBuf) msg).release(); - - // Notify success or failure - if (future.isWritten()) { - promise.setSuccess(); - } else { - promise.setFailure(future.getException()); - } - - // Reset last pending future - synchronized (SshWriteAsyncHandler.this) { - lastWriteFuture = null; - } - } - }); - } - } catch (final WritePendingException e) { - // Check limit for pending writes - pendingWriteCounter++; - if(pendingWriteCounter > MAX_PENDING_WRITES) { - promise.setFailure(e); - handlePendingFailed(ctx, new IllegalStateException("Too much pending writes(" + MAX_PENDING_WRITES + ") on channel: " + ctx.channel() + - ", remote window is not getting read or is too small")); - } - - // We need to reset buffer read index, since we've already read it when we tried to write it the first time - ((ByteBuf) msg).resetReaderIndex(); - logger.debug("Write pending to SSH remote on channel: {}, current pending count: {}", ctx.channel(), pendingWriteCounter); - - // In case of pending, re-invoke write after pending is finished - Preconditions.checkNotNull(lastWriteFuture, "Write is pending, but there was no previous write attempt", e); - lastWriteFuture.addListener(new SshFutureListener() { - @Override - public void operationComplete(final IoWriteFuture future) { - // FIXME possible minor race condition, we cannot guarantee that this callback when pending is finished will be executed first - // External thread could trigger write on this instance while we are on this line - // Verify - if (future.isWritten()) { - synchronized (SshWriteAsyncHandler.this) { - // Pending done, decrease counter - pendingWriteCounter--; - write(ctx, msg, promise); - } - } else { - // Cannot reschedule pending, fail - handlePendingFailed(ctx, e); - } - } - - }); - } - } - - private void handlePendingFailed(final ChannelHandlerContext ctx, final Exception e) { - logger.warn("Exception while writing to SSH remote on channel {}", ctx.channel(), e); - try { - channelHandler.disconnect(ctx, ctx.newPromise()); - } catch (final Exception ex) { - // This should not happen - throw new IllegalStateException(ex); - } - } - - @Override - public void close() { - asyncIn = null; - } - - private Buffer toBuffer(final Object msg) { - // TODO Buffer vs ByteBuf translate, Can we handle that better ? - Preconditions.checkState(msg instanceof ByteBuf); - final ByteBuf byteBuf = (ByteBuf) msg; - final byte[] temp = new byte[byteBuf.readableBytes()]; - byteBuf.readBytes(temp, 0, byteBuf.readableBytes()); - return new Buffer(temp); - } - - } } diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java new file mode 100644 index 0000000000..eace0ac7ea --- /dev/null +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java @@ -0,0 +1,173 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client; + +import com.google.common.base.Charsets; +import com.google.common.base.Preconditions; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import java.util.Deque; +import java.util.LinkedList; +import java.util.Queue; +import org.apache.sshd.common.future.SshFutureListener; +import org.apache.sshd.common.io.IoOutputStream; +import org.apache.sshd.common.io.IoWriteFuture; +import org.apache.sshd.common.io.WritePendingException; +import org.apache.sshd.common.util.Buffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Async Ssh writer. Takes messages(byte arrays) and sends them encrypted to remote server. + * Also handles pending writes by caching requests until pending state is over. + */ +final class AsyncSshHandlerWriter implements AutoCloseable { + + private static final Logger logger = LoggerFactory + .getLogger(AsyncSshHandlerWriter.class); + + // public static final int MAX_PENDING_WRITES = 1000; + // TODO implement Limiting mechanism for pending writes + // But there is a possible issue with limiting: + // 1. What to do when queue is full ? Immediate Fail for every request ? + // 2. At this level we might be dealing with Chunks of messages(not whole messages) and unexpected behavior might occur + // when we send/queue 1 chunk and fail the other chunks + + private IoOutputStream asyncIn; + + // Order has to be preserved for queued writes + private final Deque pending = new LinkedList<>(); + + public AsyncSshHandlerWriter(final IoOutputStream asyncIn) { + this.asyncIn = asyncIn; + } + + public synchronized void write(final ChannelHandlerContext ctx, + final Object msg, final ChannelPromise promise) { + // TODO check for isClosed, isClosing might be performed by mina SSH internally and is not required here + // If we are closed/closing, set immediate fail + if (asyncIn == null || asyncIn.isClosed() || asyncIn.isClosing()) { + promise.setFailure(new IllegalStateException("Channel closed")); + } else { + final ByteBuf byteBufMsg = (ByteBuf) msg; + if (pending.isEmpty() == false) { + queueRequest(ctx, byteBufMsg, promise); + return; + } + + writeWithPendingDetection(ctx, promise, byteBufMsg); + } + } + + private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise, final ByteBuf byteBufMsg) { + try { + if (logger.isTraceEnabled()) { + logger.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg)); + } + asyncIn.write(toBuffer(byteBufMsg)).addListener(new SshFutureListener() { + + @Override + public void operationComplete(final IoWriteFuture future) { + if (logger.isTraceEnabled()) { + logger.trace("Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}", + ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg)); + } + + // Notify success or failure + if (future.isWritten()) { + promise.setSuccess(); + } else { + logger.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(), byteBufToString(byteBufMsg), future.getException()); + promise.setFailure(future.getException()); + } + + // Not needed anymore, release + byteBufMsg.release(); + + // Check pending queue and schedule next + // At this time we are guaranteed that we are not in pending state anymore so the next request should succeed + writePendingIfAny(); + } + }); + } catch (final WritePendingException e) { + queueRequest(ctx, byteBufMsg, promise); + } + } + + private synchronized void writePendingIfAny() { + if (pending.peek() == null) { + return; + } + + // In case of pending, reschedule next message from queue + final PendingWriteRequest pendingWrite = pending.poll(); + final ByteBuf msg = pendingWrite.msg; + if (logger.isTraceEnabled()) { + logger.trace("Writing pending request on channel: {}, message: {}", pendingWrite.ctx.channel(), byteBufToString(msg)); + } + + writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg); + } + + private static String byteBufToString(final ByteBuf msg) { + msg.resetReaderIndex(); + final String s = msg.toString(Charsets.UTF_8); + msg.resetReaderIndex(); + return s; + } + + private void queueRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) { +// try { + logger.debug("Write pending on channel: {}, queueing, current queue size: {}", ctx.channel(), pending.size()); + if (logger.isTraceEnabled()) { + logger.trace("Queueing request due to pending: {}", byteBufToString(msg)); + } + new PendingWriteRequest(ctx, msg, promise).pend(pending); +// } catch (final Exception ex) { +// logger.warn("Unable to queue write request on channel: {}. Setting fail for the request: {}", ctx.channel(), ex, byteBufToString(msg)); +// msg.release(); +// promise.setFailure(ex); +// } + } + + @Override + public synchronized void close() { + asyncIn = null; + } + + private Buffer toBuffer(final ByteBuf msg) { + // TODO Buffer vs ByteBuf translate, Can we handle that better ? + final byte[] temp = new byte[msg.readableBytes()]; + msg.readBytes(temp, 0, msg.readableBytes()); + return new Buffer(temp); + } + + private static final class PendingWriteRequest { + private final ChannelHandlerContext ctx; + private final ByteBuf msg; + private final ChannelPromise promise; + + public PendingWriteRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) { + this.ctx = ctx; + // Reset reader index, last write (failed) attempt moved index to the end + msg.resetReaderIndex(); + this.msg = msg; + this.promise = promise; + } + + public void pend(final Queue pending) { + // Preconditions.checkState(pending.size() < MAX_PENDING_WRITES, + // "Too much pending writes(%s) on channel: %s, remote window is not getting read or is too small", + // pending.size(), ctx.channel()); + Preconditions.checkState(pending.offer(this), "Cannot pend another request write (pending count: %s) on channel: %s", + pending.size(), ctx.channel()); + } + } +} diff --git a/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerTest.java b/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerTest.java index 223f2c7f94..d0fc43d04a 100644 --- a/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerTest.java +++ b/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerTest.java @@ -23,12 +23,9 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyZeroInteractions; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import java.io.IOException; import java.net.SocketAddress; -import java.nio.channels.WritePendingException; import org.apache.sshd.ClientChannel; import org.apache.sshd.ClientSession; import org.apache.sshd.SshClient; @@ -46,6 +43,7 @@ import org.apache.sshd.common.io.IoWriteFuture; import org.apache.sshd.common.util.Buffer; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.mockito.Matchers; import org.mockito.Mock; @@ -59,6 +57,8 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; @@ -351,19 +351,16 @@ public class AsyncSshHandlerTest { // make first write stop pending firstWriteListener.operationComplete(ioWriteFuture); - // intercept third listener, this is regular listener for second write to determine success or failure - final ListenableFuture> afterPendingListener = stubAddListener(ioWriteFuture); // notify listener for second write that pending has ended pendingListener.get().operationComplete(ioWriteFuture); - // Notify third listener (regular listener for second write) that second write succeeded - afterPendingListener.get().operationComplete(ioWriteFuture); // verify both write promises successful verify(firstWritePromise).setSuccess(); verify(secondWritePromise).setSuccess(); } + @Ignore("Pending queue is not limited") @Test public void testWritePendingMax() throws Exception { asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise); @@ -389,11 +386,11 @@ public class AsyncSshHandlerTest { final ChannelPromise secondWritePromise = getMockedPromise(); // now make write throw pending exception doThrow(org.apache.sshd.common.io.WritePendingException.class).when(asyncIn).write(any(Buffer.class)); - for (int i = 0; i < 1000; i++) { + for (int i = 0; i < 1001; i++) { asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise); } - verify(ctx).fireChannelInactive(); + verify(secondWritePromise, times(1)).setFailure(any(Throwable.class)); } @Test -- 2.36.6