X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fnetconf-netty-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Fnettyutil%2Fhandler%2Fssh%2Fclient%2FAsyncSshHandler.java;h=3d1e4784f2ded5677472e3a25d99adaa11e24cf9;hp=0d877c9ec73797010013df229b9101d86445304f;hb=2473df9920f0820fde7dcaf62eaf14166695a5f6;hpb=99a0b01f27126a17ec2af165cf3fd9e53c977d2b diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java index 0d877c9ec7..3d1e4784f2 100644 --- a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java @@ -148,9 +148,11 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { connectPromise = null; sshReadAsyncListener = new SshReadAsyncListener(this, ctx, channel.getAsyncOut()); - sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn()); - - ctx.fireChannelActive(); + // if readAsyncListener receives immediate close, it will close this handler and closing this handler sets channel variable to null + if(channel != null) { + sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn()); + ctx.fireChannelActive(); + } } private synchronized void handleSshSetupFailure(final ChannelHandlerContext ctx, final Throwable e) { @@ -230,17 +232,14 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { @Override public synchronized void operationComplete(final IoReadFuture future) { if(future.getException() != null) { - if(asyncOut.isClosed() || asyncOut.isClosing()) { - // Ssh dropped logger.debug("Ssh session dropped on channel: {}", ctx.channel(), future.getException()); - invokeDisconnect(); - return; } else { logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException()); - invokeDisconnect(); } + invokeDisconnect(); + return; } if (future.getRead() > 0) { @@ -324,22 +323,29 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { // Check limit for pending writes pendingWriteCounter++; if(pendingWriteCounter > MAX_PENDING_WRITES) { + promise.setFailure(e); handlePendingFailed(ctx, new IllegalStateException("Too much pending writes(" + MAX_PENDING_WRITES + ") on channel: " + ctx.channel() + ", remote window is not getting read or is too small")); } + // We need to reset buffer read index, since we've already read it when we tried to write it the first time + ((ByteBuf) msg).resetReaderIndex(); logger.debug("Write pending to SSH remote on channel: {}, current pending count: {}", ctx.channel(), pendingWriteCounter); // In case of pending, re-invoke write after pending is finished + Preconditions.checkNotNull(lastWriteFuture, "Write is pending, but there was no previous write attempt", e); lastWriteFuture.addListener(new SshFutureListener() { @Override public void operationComplete(final IoWriteFuture future) { + // FIXME possible minor race condition, we cannot guarantee that this callback when pending is finished will be executed first + // External thread could trigger write on this instance while we are on this line + // Verify if (future.isWritten()) { synchronized (SshWriteAsyncHandler.this) { // Pending done, decrease counter pendingWriteCounter--; + write(ctx, msg, promise); } - write(ctx, msg, promise); } else { // Cannot reschedule pending, fail handlePendingFailed(ctx, e);